Saturday, July 19, 2014

Hadoop | MapReduce API

MapReduce is a massively scalable, parallel processing framework that works in tandem with HDFS. MapReduce and Hadoop enable compute to execute at the location of the data, rather than moving data to the compute location; data storage and computation coexist on the same physical nodes in the cluster.

Hadoop MapReduce is an open source implementation of MapReduce

MapReduce (older API implementation with Hadoop 0.20 - just for conceptual reference)

Key Constituents

Mapper Class
Mapper class is the base class for a all mappers. We need to extend org.apache.hadoop.mapreduce.Mapper for our application mapper.

Mapper class have following structure:
public class Mapper<K1, V1, K2, V2>{
     void map(K1 key, V1 value, Mapper.Context context) throws IOException, InterruptedException {..}
     protected void setup(Mapper.Context context) throws IOException, InterruptedException
     //This method is called once before any key/value pairs are presented to the map method. The default implementation does nothing
     protected void cleanup(Mapper.Context context) throws IOException, InterruptedException
     //This method is called once after all key/value pairs have been presented to the map method. The default implementation does nothing
     protected void run(Mapper.Context context) throws IOException, InterruptedException
     //This method controls the overall flow of task processing within a JVM.
     //The default implementation calls the setup method once before repeatedly calling the map method for each key/value pair in the split 
     //and then finally calls the cleanup method
}

and internally org.apache.hadoop.mapreduce.Mapper class have:
public void run(Context context) throws IOException, InterruptedException{
     setup(context);
     while (context.nextKeyValue()){
          map{context.getCurrentKey(), context.getCurrentValue(), context);
     }
     cleanup(context);
}
Hadoop provided mapper implementations are found in org.apache.hadoop.mapreduce.lib.map and org.apache.hadoop.mapred.lib packages. It includes the following:
  • InverseMapper: This outputs (value,key) 
  • TokenCounterMapper: This counts the number of discrete tokens in each line of input 
  • IdentityMapper: No change in output, used for merging etc.
Note that the mapper output (intermediate data) is stored on the local file system (NOT HDFS) of each individual mapper nodes. This is basically a temporary directory which can be configured. The intermediate data is cleaned up after job completion.
  • The total number of map tasks equals to the number of blocks in the input data to be processed.
  • Maximum number of map/reduce slots per node can be configured by using:
    • mapred.tasktracker.map.tasks.maximum
    • mapred.tasktracker.reduce.tasks.maximum    
    This way it is possible to configure the maximum number of mappers/reducers executing in parallel across the entire cluster.
  • The number of maps is usually driven by the number of DFS blocks in the input files. Although that causes people to adjust their DFS block size to adjust the number of maps.
Reducer Class
Reducer class is the base class for a all reducers (in the similar manner like we had mapper). We need to extend org.apache.hadoop.mapreduce.Reducer for our application reducer.

Reducer class have following structure:
public class Reducer<K2, V2, K3, V3>{
     void reduce(K1 key, Iterable<V2> values, Reducer.Context context) throws IOException, InterruptedException{...}
     protected void setup( Reducer.Context context) throws IOException, InterruptedException
     protected void cleanup( Reducer.Context context) throws IOException, InterruptedException
     protected void run( Reducer.Context context) throws IOException, Interrupted Exception
}
and internally org.apache.hadoop.mapreduce.Reducer class have:
public void run(Context context) throws IOException, InterruptedException {
     setup(context);
     while (context.nextKey()) {
          reduce(context.getCurrentKey(), context.getValues(), context);
     }
     cleanup(context);
}
The optimal number of reducers is related to the total number of available reducer slots in your cluster. The total number of slots is found by multiplying the number of nodes in the cluster and the number of slots per node (which is determined by the value of the mapred.tasktracker.reduce.tasks.maximum property
Combiner Class
  • The combiner does not have its own interface: a combiner must have the same signature as the reducer and hence also subclasses the Reduce class from the org.apache.hadoop.mapreduce package. The effect of perform a mini-reduce on the mapper for the output destined for each reducer.
  • Hadoop does not guarantee whether the combiner will be executed. At times, it may not be executed at all. while at times it may be used once, twice, or more times depending on the size and number of output files generated by the mapper for each reducer.
  • One constraint that a Combiner will have, unlike a Reducer, is that the input- output key and value types must match the output types of Mapper, combiners can only be used on the functions that are COMMUTATIVE(a.b = b.a) and ASSOCIATIVE {a.(b.c) = (a.b).c) . This also means that combiners may operate only on a subset of keys and values or may not execute at all.
  • Reducers can get data from multiple Mappers as part of the partitioning process. Combiners can only get its input from one Mapper.
Driver Class
Although our mapper and reducer implementations are all we need to perform the MapReduce job, there is one more piece of code required: the driver that communicates with the Hadoop framework and specifies the configuration elements needed to run a MapReduce job. This involves aspects such as telling Hadoop which Mapper and Reducer classes to use, where to find the input data and in what format, and where to place the output data and how to format it.
public class SampleDriver {
     public static void main(String[] args) throws Exception{
          //Create a Configuration object that is used to set other options
          Configuration conf = new Configuration();
          //Create the object representing the job
          Job job = new Job(conf, "SampleJob") ;
          //Set the name of the main class in the job jar file
          job.setJasByClass(SampleDriver.class);
          //Set the mapper class
          job.setMapperClass(SampleMapper.class);
          //Set the reducer class
          job.setReducerClass(SampleReducer.class):
          // Set the partition class
          job.setPartitionerClass(SamplePartioner.class);
          //Set the types for the final output key and value
          job.setOutputKeyClass(Text.dass);
          job.setOutputVahieClass(IntWritable.class):
          //Set input and output file paths
          FileInputFormat.addInputPath(job, new Path(args[0])) ;
          FileOutputFormat.setOutputPath(job, new Path(args[1]))
          //Execute the job and wait for it to complete
          System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
}
Partitioner Class
Partitioners are application code that define how keys are assigned to reduces.
  • Default partitioning spreads keys evenly, but randomly. It uses key.hashCode() % num_reduces. 
  • Custom partitioning is generally required, for example, to produce a total order in the output. It should extend Partitioner abstract class Set by calling conf.setPartitionerClass(SamplePartitioner.class)
public abstract class Partitioner<Key, Value> {
     public abstract int getPartition( Key key, Value value, int numPartitions);
}
Few things to be noted for (in context of) Partitioner are:
  • Number of resultant files generated equals to the number of reducers.
  • One of the implicit guarantees of the Reduce interface is that a single reducer will be given all the values associated with a given key.
  • With multiple reduce tasks running across a cluster, each mapper output must therefore be partitioned into the separate outputs destined for each reducer. These partitioned files are stored on the local node file system. 
  • The number of reduce tasks across the cluster is not as dynamic as that of mappers, and indeed we can specify the value as part of our job submission. Each TaskTracker therefore knows how many reducers are in the cluster and from this how many partitions the mapper output should be split into

Data Flow

Helpful Links

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
https://developer.yahoo.com/hadoop/tutorial/module4.html
https://www.youtube.com/watch?v=xYnS9PQRXTg
https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit
http://nixustechnologies.com/2014/07/hadoop-mapreduce-flow-how-data-flows-in-mapreduce
http://stackoverflow.com/questions/19473772/data-block-size-in-hdfs-why-64mb