Wednesday, July 16, 2014

Hadoop | MapReduce

In the previous post, we looked at the first component of Hadoop framework i.e. HDFS and its key features. Now we will see the concept of MapReduce frameworks.

Before we start with MapReduce, we need to understand a very important core concept of Hadoop (HDFS+MapReduce) framework known as Data Locality.
  • Data locality concept can be described as "bringing the compute to the data." In other words, whenever we use a MapReduce program on a particular part of HDFS data, we want to run that program on the node, or machine, that actually stores this data in HDFS. Doing so allows processes to be run much faster since it prevents us having to move large amounts of data around.
  • Hadoop tends to runs map tasks on nodes where the data is present locally to avoid network and inter-node communication latency as much as possible. As the input data is split into pieces and provided to map tasks, hence it is preferred that all the data given to respective map task is available on a single node.
Since HDFS only guarantees data having size equal to its block size (64M) to be present on one node, it is advised/advocated to have the split size equal to the HDFS block size so that the map task can take advantage of this data localization.

MapReduce is a framework/programming-model that allows developers to write programs that process massive amounts of unstructured data in parallel across a distributed cluster of processors or stand-alone computers.
A MapReduce program is composed of a map() function that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a reduce() function that performs a summary operation.

A typical Hadoop usage pattern involves three stages:
  • Loading data into HDFS
  • MapReduce operations.
  • Retrieving results from HDFS.
Let's start with Apache Hadoop MapReduce concepts.

Key Features

  • Fine grained Map and Reduce tasks 
    • Improved load balancing
    • Faster recovery from failed tasks 
  • Automatic re-execution on failure 
    • In a large cluster, some nodes are always slow or flaky
    • Introduces long tails or failures in computation
    • Framework re-executes failed tasks
  • Locality optimizations
    • With big data, bandwidth to data is a problem
    • MapReduce + HDFS is a very effective solution
    • MapReduce queries HDFS for locations of input data
    • Map tasks are scheduled local to the inputs when possible

Programming Model

  • MapReduce key-value input/output 
  • Complete dataset is broken into key/value pairs 
  • Map and Reduce functions
  • User defines map(), reduce() function in such a way that 
    • In map phase:
      • (K1,V1) => list (K2,V2) 
    • In reduce phase: 
      • (K2,list(V2)) => list (K3,V3) 
  • Map function is applied to every input key-value pair i.e. on each (K1,V1)
  • Map function generates intermediate key-value pairs i.e. list of (K2,V2)
  • Intermediate key-values are sorted and grouped by key i.e. (K2,list(V2)) are produced. This is the step where real magic happens, all the values corresponding to same key are combined together.
  • Reduce is applied to sorted and grouped intermediate key-values i.e. on (K2,list(V2))
  • Reduce emits result key-values i.e. list of (K3,V3)

MapReduce framework implicitly takes care of the following facts:
  • Distributed processing and coordination 
  • Breaking down jobs into smaller chunks called tasks. These tasks are distributively scheduled in the cluster (all available worker nodes) 
  • It strives to place tasks on the nodes that host the segment of data to be processed by that specific task 
  • Code is moved to where the data is, known as data locality 
  • Failures are expected more due to large amount of data, so failed tasks are automatically re-tried on other machines
  • Shuffle and sort barrier re-arranges and moves the data between machines  without any manual intervention

Phases of MapReduce

  • Initialization
  • Map
  • Shuffle
  • Sort
  • Reduce

Key Components

  • JobTracker - The primary function of the job tracker is resource management (managing the task trackers), tracking resource availability and task life cycle management (tracking its progress, fault tolerance etc.)
  • TaskTracker- The task tracker has simple function of following the orders of the job tracker and updating the job tracker with its progress status periodically.
Steps involved in submitting a job execution are:
  1. Client applications submit jobs to the Job tracker. 
  2. The JobTracker talks to the NameNode to determine the location of the data 
  3. The JobTracker locates TaskTracker nodes with available slots at or near the data 
  4. The JobTracker submits the work to the chosen TaskTracker nodes. 
  5. The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker. 
  6. A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may even blacklist the TaskTracker as unreliable. 
  7. When the work is completed, the JobTracker updates its status.
  8. Client applications can poll the JobTracker for information.
Few noticeable facts about JobTracker & TaskTracker: 
  • The JobTracker is a point of failure for the Hadoop MapReduce service. If it goes down, all running jobs are halted. 
  • The task tracker is pre-configured with a number of slots indicating the number of tasks it can accept. When the job tracker tries to schedule a task, it looks for an empty slot in the tasktracker running on the same server which hosts the datanode where the data for that task resides. If not found, it looks for the machine in the same rack. There is no consideration of system load during this allocation. 
  • HDFS is rack aware in the sense that the NameNode and JobTrackerobtain a list of rack ids corresponding to each of the slave nodes (datanodes) and creates a mapping between the IP address and the rack id. HDFS uses this knowledge to replicate data across different racks so that data is not lost in the event of a complete rack power outage or switch failure.
  • Hadoop does speculative execution where if a machine is slow in the cluster and the map/reduce tasks running on this machine are holding on to the entire map/reduce phase, then it runs redundant jobs on other machines to process the same task, and whichever task gets completed first reports back to the job tracker and results from the same are carried forward into the next phase.
  • The task tracker spawns different JVM processes to ensure that process failures do not bring down the task tracker.
  • The task tracker keeps sending heartbeat messages to the job tracker to say that it is alive and to keep it updated with the number of empty slots available for running more tasks.
  • From version 0.21 of Hadoop, the job tracker does some checkpointing of its work in the filesystem. Whenever, it starts up it checks what was it upto till the last CP and resumes any incomplete jobs. Earlier, if the job tracker went down, all the active job information used to get lost.
  • The status and information about the job tracker and the task tracker are exposed via jetty onto a web interface.

Hadoop Modes

  • Local standalone mode: NameNode, DataNode, JobTracker, and TaskTracker, run as a single Java process. Note that in this case we don't need to start any service on any machine, it will be just like executing a Java application ("bin/hadoop" command is used to run programs)
  • A pseudo-distributed mode: NameNode, DataNode, JobTracker, and TaskTracker, run as separate Java processes but on single host. Here everything will be part of same physical machine (localhost/127.0.0.1)
  • Fully distributed mode: In this mode, Hadoop is spread across multiple machines, some of which will be general-purpose workers and others will be dedicated hosts for components, such as NameNode and JobTracker. This will be like an ideal cluster, the way we use in production environments.
For installation and working of HDFS + MapReduce follow next post