Big Data Tutorial: Map Reduce
NOTICE: This page refers to the Peel Hadoop Cluster which was retired in 2023 and is in the process of being archived. Its contents should not be referenced for Dataproc Courses.
What is Hadoop?
Hadoop is an open-source software framework for storing and processing big data in a distributed/parallel fashion on large clusters of commodity hardware. Essentially, it accomplishes two tasks: massive data storage and faster processing. The core Hadoop consists of HDFS and Hadoop's implementation of MapReduce.
What is HDFS?
HDFS stands for Hadoop Distributed File System. HDFS is a highly fault-tolerant file system and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets.
What is Map-Reduce?
MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.
Let's go to the slide deck for more information: Mapreduce
Phases in MapReduce
A MapReduce job splits a large data set into independent chunks and organizes them into key-value pairs for parallel processing. A key-value pair (KVP) is a set of two linked data items: a key, which is a unique identifier for some item of data, and the value, which is either the data that is identified or a pointer to the location of that data. The mapping and reducing functions receive not just values, but (key, value) pairs.This parallel processing improves the speed and reliability of the cluster, returning solutions more quickly and with greater reliability.
Every MapReduce job consists of at least three parts:
The driver
The Mapper
The Reducer
Mapping Phase
The first phase of a MapReduce program is called mapping. A list of data elements are provided, one at a time, to a function called the Mapper, which transforms each element individually to an output data element.
The Map function divides the input into ranges by the InputFormat and creates a map task for each range in the input. The JobTracker distributes those tasks to the worker nodes. The output of each map task is partitioned into a group of key-value pairs for each reduce.
Reducing Phase
Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.
The Reduce function then collects the various results and combines them to answer the larger problem that the master node needs to solve. Each reduce pulls the relevant partition from the machines where the maps executed, then writes its output back into HDFS. Thus, the reduce is able to collect the data from all of the maps for the keys and combine them to solve the problem.
MapReduce Data Flow
What is Peel?
Peel is the stand-alone Hadoop cluster running on Cloudera CDH version 6.3.4. Cloudera Enterprise(CDH) combines Apache Hadoop 3.0.0 and Apache Spark 2.4.0 with a number of other open-source projects to create a single, massively scalable system where you can unite storage with an array of powerful processing and analytic frameworks.
Commands for HDFS & MapReduce
To access peel the Hadoop cluster
Windows:
Step - 1: Connect to NYU VPN
Step - 2: User hostname "peel.hpc.nyu.edu" with port 22 in putty. Provide your credentials.
Mac:
Step - 1: Connect to NYU VPN
Step - 2: Then from terminal, use "ssh <net id>@peel.hpc.nyu.edu". It will connect to the peel cluster.
For more information, please follow the instructions on the Hadoop User Guide.
HDFS COMMANDS
TO UPLOAD DATA TO HDFS
hadoop fs -put <filename_in_lfs> <hdfs_name> orhadoop fs -copyFromLocal <filename_in_lfs> <hdfs_name> orhdfs dfs -put <filename_in_lfs> <hdfs_name>TO GET DATA FROM HDFS
hadoop fs -get <hdfs_name> <filename_in_lfs> orhadoop fs -copyToLocal <hdfs_name> <filename_in_lfs>TO CHECK HDFS FOR YOUR FILE
hadoop fs -lsMAPREDUCE COMMANDS
usage: hadoop [--config confdir] COMMAND where COMMAND is one of: fs run a generic filesystem user client version print the version jar <jar> run a jar file distcp <srcurl> <desturl> copy file or directories recursively archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive classpath prints the class path needed to get the Hadoop jar and the required libraries daemonlog get/set the log level for each daemon or CLASSNAME run the class named CLASSNAMETo trigger the job
hadoop jar <jarfilename>.jar <DriverClassName> <ip_file_in_HDFS> <op_dir_name>To check running jobs
hadoop job -listoryarn application -listTo kill a job
hadoop job -kill <job_id>oryarn application -kill <job_id>Examples for Map-Reduce Jobs
Example 1:
Word Count: The objective here is to count the number of occurrences of each word by using key-value pairs.
Step 1:
Mac Users: ssh <NYUID>@peel.hpc.nyu.edu
or
Windows Users: Use Putty
Step 2:
Copy example1 folder to /home/<net_id>/
cp -r /scratch/work/public/peel/tutorials/Tutorial1/example1 /home/<net_id>/cd /home/<net_id>/example1Step 3:
Place the book.txt file on to hdfs
hadoop fs -put /home/<net_id>/example1/book.txt /user/<net_id>/book.txtStep 4:
Compile code with java compiler and create a jar file using generated class files.
export HADOOP_LIPATH=/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/libjavac -classpath $HADOOP_LIPATH/hadoop/*:$HADOOP_LIPATH/hadoop-mapreduce/*:$HADOOP_LIPATH/hadoop-hdfs/* *.javajar cvf WordCount.jar *.classStep 5:
Run the MapReduce job using WordCount.jar
hadoop jar WordCount.jar WordCount /user/<net_id>/book.txt /user/<net id>/wordcountoutputStep 6:
Check output by accessing HDFS directories
hadoop fs -ls /user/<net_id>/wordcountoutputhadoop fs -cat /user/<net_id>/wordcountoutput/part-r-00000ORhadoop fs -getmerge /user/<net_id>/wordcountoutput $HOME/output.txtcat $HOME/output.txtExample 2:
Standard Deviation: The objective is to find the standard deviation of the length of the words.
Step 1:
copy example2 folder to /home/<net_id>/
cp -r /scratch/work/public/peel/tutorials/Tutorial1/example2 /home/<net_id>/example2.txt - Input file
StandardDeviation.jar - compiled jar file
Step 2:
Place the example2.txt file on to hdfs
hadoop fs -put /home/<net_id>/example2/example2.txt /user/<net_id>/example2.txtStep 3:
Run the MapReduce job using StandardDeviation.jar
hadoop jar StandardDeviation.jar wordstandarddeviation /user/<net_id>/example2.txt /user/<net_id>/standarddeviationoutputStep 4:
Check output by accessing HDFS directories
hadoop fs -ls /user/<net_id>/standarddeviationoutputhadoop fs -cat /user/<net_id>/standarddeviationoutput/part-r-00000Example 3:
Step 1:
Create a directory to work with example3
mkdir /home/<net_id>/example3cd /home/<net_id>/example3Step 2:
Copy the input to the local directory, then to HDFS.
cp /scratch/work/public/peel/tutorials/Tutorial1/example3/MapReduce-master/examples/inputComments.xml /home/<net_id>/example3/hadoop fs -put /home/<net_id>/example3/inputComments.xml /user/<net_id>/Step 3:
Clone a git repository to create a local copy of the code
git clone https://github.com/geftimov/MapReduce.gitcd /home/<net_id>/example3/MapReduceStep 4:
Build/compile using maven. Make sure pom.xml is present in the same directory. This command will generate the "target" directory.
/share/apps/peel/maven/3.5.2/bin/mvn installcd targetStep 5:
Extract jar file. This command creates directory "com".
jar -xvf MapReduce-0.0.1-SNAPSHOT.jarStep 6:
Execute the process using the class files created in directory "com".
export JAVA_CLASS=com/eftimoff/mapreduce/summarization/numerical/averagehadoop jar MapReduce-0.0.1-SNAPSHOT.jar $JAVA_CLASS/Average /user/<net_id>/inputComments.xml /user/<net_id>/AverageOutputStep 7:
Check output by accessing HDFS directories.
hadoop fs -ls /user/<net_id>/AverageOutputhadoop fs -cat /user/<net_id>/AverageOutput/part-r-00000MapReduce Streaming
Even though the Hadoop framework is written in Java, programs for Hadoop need not be coded in Java but can also be developed in other languages like Python, shell scripts or C++. Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
Streaming runs a MapReduce job from the command line. You specify a map script, a reduce script, an input, and an output. Streaming takes care of the Map-Reduce details such as making sure that your job is split into separate tasks, that the map tasks are executed where the data is stored. Hadoop Streaming works a little differently (your program is not presented with one record at a time, you have to iterate yourself)
-input – The data in hdfs that you want to process
-output – The directory in hdfs where you want to store the output
-mapper script – the program script command line or process that you want to use for your mapper
-reducer script – the program script command or process that you want to use for your reducer.
-file – Make the mapper, reducer, or combiner executable available locally on the compute nodes.
There is an example of Hadoop-streaming at /share/apps/examples/hadoop-streaming on Peel. The README file explains how to run the example and where to find the hadoop-streaming.jar
Command used to run a MapReduce job using streaming:
cp -r /share/apps/examples/hadoop-streaming $HOME/example/ cd $HOME/example/An example of how to run a Hadoop-streaming job is:
export HADOOP_LIPATH=/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/libhadoop jar $HADOOP_LIPATH/hadoop-mapreduce/hadoop-streaming.jar -numReduceTasks 2 -file $HOME/example -mapper example/mapper.py -reducer example/reducer.py -input /user/<net_id>/book.txt -output /user/<net_id>/example.out