Program #1: The aim of the program is to find the Maximum temperature recorded for each year of NCDC data.
The input for our program is weather data files for each year This weather data is collected by National Climatic Data Center – NCDC from weather sensors at all over the world. You can find weather data for each year from ftp://ftp.ncdc.noaa.gov/pub/data/noaa/.All files are zipped by year and the weather station. For each year, there are multiple files for different weather stations .
Dataset Description
If we consider the details mentioned in the file, each file has entries that look like this:
0029029070999991905010106004+64333+023450FM-12+000599999V0202301N008219999999N0000001N9-01391+99999102641ADDGF102991999999999999999999
When we consider the highlighted fields, the first one (029070) is the USAF weather station identifier. The next one (19050101) represents the observation date. The third highlighted one (0139)represents the air temperature in Celsius times ten. So the reading of 0139 equates to 13.9 degrees Celsius. The next highlighted and italic item indicates a reading quality code.
Implementation
MapReduce is based on set of key value pairs. So first we have to decide on the types for the key/value pairs for the input.
Map Phase: The input for Map phase is set of weather data files as shown in snap shot. The types of input key value pairs are LongWritable and Text and the types of output key value pairs are Text and IntWritable. Each Map task extracts the temperature data from the given year file. The output of the map phase is set of key value pairs. Set of keys are the years. Values are the temperature of each year.
Reduce Phase: Reduce phase takes all the values associated with a particular key. That is all the temperature values belong to a particular year is fed to a same reducer. Then each reducer finds the highest recorded temperature for each year. The types of output key value pairs in Map phase is same for the types of input key value pairs in reduce phase (Text and IntWritable). The types of output key value pairs in reduce phase is too Text and IntWritable.
So, in this example we write three java classes:
HighestMapper.java
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class HighestMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
public static final int MISSING = 9999;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String year = line.substring(15,19);
int temperature;
if (line.charAt(87)=='+')
temperature = Integer.parseInt(line.substring(88, 92));
else
temperature = Integer.parseInt(line.substring(87, 92));
String quality = line.substring(92, 93);
if(temperature != MISSING && quality.matches("[01459]"))
output.collect(new Text(year),new IntWritable(temperature));
}
}
HighestReducer.java
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class HighestReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int max_temp = 0;
while (values.hasNext())
{
int current=values.next().get();
if ( max_temp < current)
max_temp = current;
}
output.collect(key, new IntWritable(max_temp/10));
}
}
HighestDriver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class HighestDriver extends Configured implements Tool{
public int run(String[] args) throws Exception
{
JobConf conf = new JobConf(getConf(), HighestDriver.class);
conf.setJobName("HighestDriver");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(HighestMapper.class);
conf.setReducerClass(HighestReducer.class);
Path inp = new Path(args[1]);
Path out = new Path(args[2]);
FileInputFormat.addInputPath(conf, inp);
FileOutputFormat.setOutputPath(conf, out);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new HighestDriver(),args);
System.exit(res);
}
}
Build jar file using any IDE like netbeans or eclipse
Note: add all the jars in the folder /usr/local/hadoop/share/hadoop/common and /usr/local/hadoop/share/hadoop/mapreduce folders to your project in IDE
Copy input files from local system to HDFS
Syntax:
hadoop fs -copyFromLocal source_path(local System) Destination_path(HDFS)
hduser@MKP:/usr/local/hadoop/sbin$ hadoop fs -copyFromLocal /home/mkp/Desktop/1901 /HighestTemp/input
(OR)
hduser@MKP:/usr/local/hadoop/sbin$ hadoop fs -copyFromLocal /home/mkp/Desktop/190* /HighestTemp/input
Executing jar file
Syntax:
hadoop jar jar_file_path main_class_name input_file/directory_path output_directory_path
hduser@MKP:/usr/local/hadoop/sbin$ hadoop jar /home/mkp/Desktop/HighestDriver.jar HighestDriver /HighestTemp/input/ /HighestTemp/output
Expected Output:
1901 45
1902 40
1903 46
Program #2: Find out number of products sold in each country
Dataset Description
SalesCountryMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesCountryMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
output.collect(new Text(SingleCountryData[7]), one);
}
}
SalesCountryReducer.java
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Text key = t_key;
int frequencyForCountry = 0;
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
output.collect(key, new IntWritable(frequencyForCountry));
}
}
SalesCountryDriver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class SalesCountryDriver {
public static void main(String[] args) {
JobClient my_client = new JobClient();
// Create a configuration object for the job
JobConf job_conf = new JobConf(SalesCountryDriver.class);
// Set a name of the Job
job_conf.setJobName("SalesCountryDriver");
// Specify data type of output key and value
job_conf.setOutputKeyClass(Text.class);
job_conf.setOutputValueClass(IntWritable.class);
// Specify names of Mapper and Reducer Class
job_conf.setMapperClass(salescountrydriver.SalesCountryMapper.class);
job_conf.setReducerClass(salescountrydriver.SalesCountryReducer.class);
// Specify formats of the data type of Input and output
job_conf.setInputFormat(TextInputFormat.class);
job_conf.setOutputFormat(TextOutputFormat.class);
// Set input and output directories using command line arguments,
//arg[1] = name of input directory on HDFS, and arg[2] = name of output directory to be created to store the output file.
FileInputFormat.setInputPaths(job_conf, new Path(args[1]));
FileOutputFormat.setOutputPath(job_conf, new Path(args[2]));
my_client.setConf(job_conf);
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Program#3: word count program
WordCountMapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (int i = 0; i < words.length; i++) {
context.write(new Text(words[i]), one);
}
}
}
WordCountReducer.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>
{
@Override
protected void reduce(Text key,Iterable<LongWritable> value,Context context)throws IOException,InterruptedException
{
long sum=0;
while(value.iterator().hasNext())
{
sum+=value.iterator().next().get();
}
context.write(key,new LongWritable(sum));
}
}
WordCountJob.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
public class WordCountJob implements Tool{
private Configuration conf;
@Override
public Configuration getConf()
{
return conf;
}
@Override
public void setConf(Configuration conf)
{
this.conf=conf;
}
@Override
public int run(String []args)throws Exception
{
Job wordcountjob=new Job(getConf());
wordcountjob.setJobName("WordCountJob");
wordcountjob.setJarByClass(this.getClass());
wordcountjob.setMapperClass(WordCountMapper.class);
wordcountjob.setReducerClass(WordCountReducer.class);
wordcountjob.setMapOutputKeyClass(Text.class);
wordcountjob.setMapOutputValueClass(LongWritable.class);
wordcountjob.setOutputKeyClass(Text.class);
wordcountjob.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(wordcountjob,new Path(args[1]));
FileOutputFormat.setOutputPath(wordcountjob,new Path(args[2]));
return wordcountjob.waitForCompletion(true)==true? 0:1;
}
public static void main(String []args)throws Exception
{
ToolRunner.run(new Configuration(),new WordCountJob(),args);
}
}