HDFS:
HDFS Architecture:
Map Reduce:
Name Node:
Data Node:
Secondary Name Node:
Job Tracker:
Task Tracker:
Record Reader:
Ex. <Key, Value>
<1, What do you mean by object>
<2, What do you about Java>
Record Writer:
Ex. What 2
do 2
you 2
Combiner:
How Combiner Works?
Although, Combiner is optional yet it helps segregating data into multiple groups for Reduce phase, which makes it easier to process.
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What, 1,1> <do, 1,1> <you, 1,1> <mean,1> <by, 1> <Object, 1> <know, 1> <about, 1> <Java, 1>
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Partitioner:
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
UNIT - IV
Hadoop I/O
Why does Hadoop use Writable(s)?
Writable Interface
The Writable interface defines two methods
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable
{
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
Writable Comparable and comparators
package org.apache.hadoop.io;
public interface WritableComparable <T> extends Writable, Comparable <T>
{
}
package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator <T> extends Comparator <T>
{
public int compare (byte [ ] b1, int s1, int l1, byte [ ] b2, int s2, int l2);
}
RawComparator <IntWritable> c = WritableComparator.get(IntWritable.class);
IntWritable w1 =new IntWritable(164);
IntWritable w2 =new IntWritable(67);
assertThat(c.compare(w1,w2), greaterThan(0));
Writable classes
Writable wrappers for java primitives
IntWritable iw= new IntWritable();
iw.set(154);
IntWritable iw = new IntWritable(154);
How do you choose between a fixed length and a variable length encoding?
Text
Text t=new Text("hadoop");
t.set("pig");
t.toString();
BytesWritable
BytesWritable b = new BytesWritable (new byte[] {3,5});
Null Writable
Object Writable and Generic Writable
Writable collections
ArrayWritable aw = new ArrayWritable(Text.class);
public class TextArrayWritable extends ArrayWritable
{
public TextArrayWritable()
{
super(Text.class);
}
}
MapWritable src = new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));
Implementing a custom Writable
Prog: The Writable implementation that stores a pair of Text objects
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair>
{
private Text first;
private Text second;
public TextPair()
{
set(new Text(), new Text());
}
public TextPair(String first, String second)
{
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second)
{
set(first, second);
}
public void set(Text first, Text second)
{
this.first = first;
this.second = second;
}
public Text getFirst()
{
return first;
}
public Text getSecond()
{
return second;
}
@Override
public void write(DataOutput out) throws IOException
{
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException
{
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode()
{
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o)
{
if (o instanceof TextPair)
{
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString()
{
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp)
{
int cmp = first.compareTo(tp.first);
if (cmp != 0)
{
return cmp;
}
return second.compareTo(tp.second);
}
}
Implementing a Raw Comparator for speed
Prog: A Raw Comparator for comparing Text pair binary representations
public static class Comparator extends WritableComparator
{
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator()
{
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0)
{
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
}
catch (IOException e)
{
throw new IllegalArgumentException(e);
}
}
}
static
{
WritableComparator.define(TextPair.class, new Comparator());
}
Custom comparators
Prog: A custom RawComparator for comparing the first field of TextPair byte representations
public static class FirstComparator extends WritableComparator
{
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator()
{
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
try
{
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
}
catch (IOException e)
{
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b)
{
if (a instanceof TextPair && b instanceof TextPair)
{
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}