Map Reduce – Advanced Quiz


(Map Reduce – Advanced Quiz)

Q#1

In Mapreduce , Join operations are resource hungry , in order to provide better performance Map-side joins provides an option to create joins between 2 text files when one of the data set is small. Below is a code snippet to show how to create a map-side join , question is how to get the smaller data set to load into mapper in a map side join.

public void map(Object key, Text value, Context context)throws IOException, InterruptedException {

Map<String, String> parsed = transformXmlToMap(value.toString());

String userId = parsed.get(“UserId”);

String userInformation = userIdToInfo.get(userId);

if (userInformation != null) {

outvalue.set(userInformation);

context.write(value, outvalue);

} else if (joinType.equalsIgnoreCase(“leftouter”)) {

context.write(value, EMPTY_TEXT);

}

}

In order to complete this map side joins in above mapper code , the smaller data set has to be made available ,which of the following answers describe the best way to have the data available.

Option 1 : No need to send any additional data , the MR framework will take care of the input paths.

Option 2: In the driver code have 2 separate mapper inputs using the following code

job.setInputPaths(job, new Path(path1),new Path(path2);

Option 3: Create a distributed cache and load the smaller data set in the distributed cache and have the bigger data set

Loaded as input as the below snippet:

DistributedCache.addLocalCacheFile(job,new Path(path1));

And lookup the cache file in setup of mapper as

public void setup(Context context) throws IOException, InterruptedException {

Path[] files =

DistributedCache.getLocalCacheFiles(context.getConfiguration());

for (Path p : files) {

BufferedReader rdr = new BufferedReader(

new InputStreamReader(

new GZIPInputStream(new FileInputStream(

new File(p.toString())))));

String line = null;

while ((line = rdr.readLine()) != null) {

Map<String, String> parsed = transformXmlToMap(line);

String userId = parsed.get(“Id”);

userIdToInfo.put(userId, line);

}

}

Option 4: Get multiple data sources into two separate mappers and finally use a reducer to get the joins.


Q#2

Hadoop is used for non-relational data , sometimes its important to manage relationship between 2 data sets.

The simplest way to manage the relationship is by using Key-value pairs in Mapreduce jobs.

Sometimes its imperative that relationship between 2 data sets needs to be managed using foreign keys. The

Best way to create the foreign key is to introduce secondary sorting in MapReduce. Which of the following is the best way to create a secondary sorting

Option 1 :

Initially use MultipeInputs to load the 2 data sets using

MultipleInputs.addInputPath(job,new Path(path1),TextInputFormat,MapperClass );

MultipleInputs.addInputPath(job,new Path(path2),TextInputFormat,MapperClass);

In the Mapper add key as

context.write(key1+”,”+key2,value);

Create a Partitioner and override the partitioner code as :

@Override

public int getPartition(Text arg0, Text arg1, int arg2) {

return arg0.toString().split(“,”)[0].hashCode()%arg2;

}

And finally create a group Comparator overriding compare method:

@Override

public int compare(WritableComparable a, WritableComparable b) {

Text s1 = (Text) a;

Text s2 = (Text) b;

return s1.toString().split(“,”)[0].compareTo(s2.toString().split(“,”)[0]);

}

Option 2 : Secondary sorting is not possible in mapreduce as HDFS does not maintain relationship between

Data.

Option 3: Create a new mapreduce program and write output as

context.write(key1+key2,value);

and this will automatically get similar data sets to be loaded into reducer.


Q#3

Data needs to be transmitted and transferred over network during any MapReduce job. The data that needs to be transferred during any Mapreduce job across the cluster is the Key-Value pair. For keys its important that the data transferred between nodes can also be sorted and shuffled before it reaches the reducer. What is the way in which keys can be transferred across the nodes.

Option 1: No need to make special requirements for keys, use any java data types to transfer the keys.

Option 2: Use a writable object like mentioned below

class A extends implements Writable{

Text a;

Text b;

}

Option 3: Use a Writable Comparable class that can override compare to method :

class A extends implements WritableComparable {

Text a;

Text b;

public int compareTo(KeyCustom arg0) {

return placeOfBirth.compareTo(arg0.placeOfBirth);

}

}


Q#4

Data in Big Data world is huge, for analytics sometimes its important to get a high level aggregated view of the

Data. In RDBMS the best way to get a top 10 view is by :

Select * from employee order by salary limit 10;

Which of the following Mapreduce program depicts this top 10 view :

Option 1 : Create a Mapreduce program as follows :

private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();

public void map(Object key, Text value, Context context)throws IOException, InterruptedException {

Map<String, String> parsed = value.split(“,”);

String userId = parsed[0];

String reputation = parsed[1];

repToRecordMap.put(Integer.parseInt(reputation), new Text(value));

if (repToRecordMap.size() > 10) {

repToRecordMap.remove(repToRecordMap.firstKey());

}

}

public static class TopTenReducer extendsReducer<NullWritable, Text, NullWritable, Text> {

private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();

public void reduce(NullWritable key, Iterable<Text> values,Context context) throws IOException,

InterruptedException {

for (Text value : values) {

Map<String, String> parsed = value.split(“,”);

repToRecordMap.put(Integer.parseInt(parsed[0]),new Text(value));

if (repToRecordMap.size() > 10) {

repToRecordMap.remove(repToRecordMap.firstKey());

}

}

for (Text t : repToRecordMap.descendingMap().values()) {

context.write(NullWritable.get(), t);

}

}

Option 2: Mapreduce cannot be used to find the top 10 from a text data loaded into HDFS.

Option 3:

public void map(Object key, Text value, Context context)throws IOException, InterruptedException {

Map<String, String> parsed = value.split(“,”);

String userId = parsed[0];

String reputation = parsed[1];

context.write(NullWritable.get(),new Text(userID,reputation));

}

public void reduce(NullWritable key, Iterable<Text> values,Context context) throws IOException,

InterruptedException {

for (Text value : values) {

context.write(NullWritable.get(), value);

}

}


Q#5

Counters in Mapreduce has some of the best uses to perform summarization of the data. Some of the examples of using a counter in Mapreduce is to get a overall count of records , sum of similar records and count uniqueness of records. The code below describes a counting algorithm in Mapreduce code, find the best way to retrieve the counter in any Mapreduce code.

public void map(Object key, Text value, Context context)throws IOException, InterruptedException {

Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

String location = parsed.get(“Location”);

if (location != null && !location.isEmpty()) {

String[] tokens = location.toUpperCase().split(“\\s”);

boolean unknown = true;

for (String state : tokens) {

if (states.contains(state)) {

context.getCounter(STATE_COUNTER_GROUP, state)

.increment(1);

unknown = false;

break;

}

}

if (unknown) {

context.getCounter(STATE_COUNTER_GROUP, UNKNOWN_COUNTER)

.increment(1);

}

} else {

context.getCounter(STATE_COUNTER_GROUP,

NULL_OR_EMPTY_COUNTER).increment(1);

}

}

Option 1: use distributed cache to lookup the counter in the driver code

job.getDistributedCache(countername);

Option 2: In the driver code check the completion of the job and get the counter as follows

int code = job.waitForCompletion(true) ? 0 : 1;

if (code == 0) {

for (Counter counter : job.getCounters().getGroup(

CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) {

System.out.println(counter.getDisplayName() + “\t”+ counter.getValue());

}

Option 3: Use a reducer and get the context in the setup method

public void reduce(NullWritable key, Iterable<Text> values,Context context) throws IOException,

InterruptedException {

for (Counter counter : context.job.getCounters().getGroup(

CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) {

System.out.println(counter.getDisplayName() + “\t”+ counter.getValue());

}

}


Q#6

A group comparator in MapReduce ensures that data which are divided on natural key and argument key can redirect only one key type to reducer types. Looking into the code below identify what the group comparator is doing :

public class GroupComparator extends WritableComparator{

public GroupComparator()

{

super(SentimentKey.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

CompositKey s1 = (CompositKey) a;

CompositKey s2 = (CompositKey) b;

return s1.naturalKey.compareTo(s2. naturalKey);

}

}

Option 1: This code redirects entire compositKey data as input key to Reducer .

Option 2: This code will have no effect as the final processing of which data will go to which reducer is done by Partitioner.

Option 3: This code ensures that only natural key is used to redirect the keys to reducer.


Q#7

A MapReduce meta-pattern provides a way to chain jobs together , based on the below code snippet find the difference between the 2 patterns:

Main method {

boolean flag = job1.waitforCompletion(true);

if(flag==true)

{

job2.waitForCompletion(true);

}

}

Main Method{

job.1submit();

job2.submit();

while(! Job1.iscomplete() || ! Job2.isComplete())

Thread.sleep(5000);

}

Option 1: In case 1 job2 will wait for job1 and job2 will only start after successful completion of job 1 and in

Case 2 job1 and job2 will execute in parallel and the driver code will wait till the both jobs are complete.

Option 2: No matter what sequence the jobs are scheduled , jobtracker will ensure the execution process.

Option 3: Incase of case 1 job1 and job 2 will execute in sequence irrespective of the success of any of the jobs. Incase

2 both jobs will be submitted in parallel.


Q#8

Data passed to Mapper code depends on the InputFormat, different business cases requires using different Inputformat. In order to read a whole file at a time and send each file as input to a mapper we normally write

Custom input format. In order to write a input format we need to implement a Record reader, based on the code below find the best explanation of the record reader implemented.

private class WholeFileRecordReader extends RecordReader<NullWritable, Text>

{

@Override

public void initialize(InputSplit arg0, TaskAttemptContext arg1)

throws IOException, InterruptedException {

this.split = (FileSplit) arg0;

this.conf = arg1.getConfiguration();

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

if(!processed)

{

byte[] content = new byte[(int) split.getLength()];

Path file = split.getPath();

System.out.println(“*********Name*************”+file.getName());

FileSystem fs = file.getFileSystem(conf);

FSDataInputStream fis = null;

try

{

fis = fs.open(file);

IOUtils.readFully(fis, content, 0, content.length);

String decoded = new String(content, “UTF-8”);

value = new Text(decoded);

}

finally

{

IOUtils.closeStream(fis);

}

processed = true;

return true;

}

return false;

}

Option 1: This RecordReader tries to provide the default implementation that writes out one line of data at a time .

Option 2: This record reader implements a whole file input format reader that reads a whole file at a time and writes out One file at a time to mapper.

Option 3: This record reader implements a paragraph input format that reads one paragraph at a time and writes out One file at a time to mapper.


Q#9

Partitioner in Mapreduce ensures which data moves from mapper to reducer. Following code specify a partitioner that provides which data is moved to which reducer. Find the best explanation for the code below:

public class Join1Partitioner extends Partitioner<IntWritable, Text>{

@Override

public int getPartition(IntWritable arg0, Text arg1, int arg2) {

try

{

return (arg0%2).hashCode()%arg2;

}

catch(Exception ex) {

return 0;

}

}

}

Option 1: This is an implementation of a Partitioner that partitions the data bases on even and odd keys.

Option 2: This is an default implementation of a Partitioner that sends keys to respective reducer.

Option 3: This is an implementation of Partitioner that separates keys on the hashcode and sends the pair to

Separate reducer.


Q#10

In Mapreduce , it becomes important to access parameter information in the mapper which can be passed from the driver code. In most cases the parameter provides some associated information passes by the programmer. Given the driver code find the best way to retrieve the information in mapper :

Configuration conf = new Configuration();

conf.set(“test”, “123”);

Job job = new Job(conf);

Option 1:

protected void setup(Context context) throws IOException,

InterruptedException {

int value = (Integer)context.getConfiguration().get(“test”);

}

Option 2:

protected void cleanup(Context context) throws IOException,

InterruptedException {

int value = (Integer)context.get(“test”);

}

Option 3: public void map(Object key, Text value, Context context) throws IOException, InterruptedException

{

int value = (Integer)context.get(“test”);

}