Friday, July 18, 2014

Pig | High Level MapReduce

Pig is a high-level platform/scripting-language for creating/running MapReduce programs used with Hadoop. The language used in this platform is called Pig Latin. Pig Latin allows user to write complex MapReduce transformations using a simple scripting language. This language defines a set of transformations on a data set such as aggregate, join and sort conceptually similar to SQL for RDBMS systems.

Pig have rich set of functions, Pig Latin can be also be extended using UDF (User Defined Functions) which can be written in Java, Python, JavaScript, Ruby or Groovy as per custom requirements and then call directly from the language. Detailed list of Pig built in functions can be found here.

Pig is complete in that you can do all the required data manipulations in Apache Hadoop with Pig. It simplifies the complex operations like joins and filters etc. for performing query operations in hadoop.

Wide usage of Pig includes:
  • Extract Transform Load (ETL) e.g. Processing large amounts of log data, clean bad entries, join with other data-sets.
  • Research of "raw" Information e.g. User Audit Logs where Schema maybe unknown or inconsistent

Key Features

  • Join
  • Sort
  • Filter 
  • Data Types 
  • Group By 
  • User Defined functions

Key Components

  • Pig Latin
    • It is a command based language
    • It is designed specifically for data transformation and flow expression.
  • Execution Environment
    • It is the environment in which Pig Latin commands are executed.
    • It basically a high-level, abstract environment for MapReduce.
    • It also supports for local mode for testing and small operations.
  • Pig compiler
    • It converts Pig Latin to MapReduce jobs.
    • Pig Latin compiler optimizes execution.
    • We get default optimization improvements with Pig updates.

Pig Data Types

Pig's supports all of the primitive types available in most programming languages. We can compare the types with java.lang package for reference. In addition it also supports bytearray.
  • Simple Types: 
    • int 
      • An integer. Ints are represented in interfaces by java.lang.Integer
    • long 
      • A long integer. Longs are represented in interfaces by java.lang.Long
    • float 
      •  A floating-point number. Floats are represented in interfaces by java.lang.Float
    • double 
      • A double-precision floating-point number. Doubles are represented in interfaces by java.lang.Double
    • chararray 
      • A string or character array. CharArrays are represented in interfaces by java.lang.String
    • bytearray 
      • A blob or array of bytes. ByteArrays are represented in interfaces by a Java class DataByteArray that wraps a Java byte[]. 
  • Complex Types: 
    • Map 
      • A map in Pig is a chararray to data element mapping, where that element can be any Pig type, including a complex type. 
      • The chararray is called a key and is used as an index to find the element, referred to as the value. Because Pig does not know the type of the value, it will assume it is a bytearray. Still the actual value may differ. If we know what the actual type is (or what we want it to be), we can cast it. If we do not cast the value, Pig will make a best guess based on how we use the value in the script. If the value is of a type other than bytearray, Pig will figure that out at runtime and handle it.
      • There is no requirement that all values in a map must be of the same type. It is perfectly fine to have a map with two keys profession and salary, where the value for profession is a chararray and the value for salary is an int.
    • Tuple 
      •  A tuple is a fixed-length, ordered collection of Pig data elements. Tuples are divided into fields, with each field containing one data element. 
      • Again these elements can be of any type (exactly like map key) they do not all need to be the same type. 
      • A tuple is analogous to a row in SQL, with the fields being SQL columns. Because tuples are ordered, it is possible to refer to the fields by position. 
      • A tuple may have a schema associated with it that describes each field's type and provides a name for each field. This allows Pig to check that the data in the tuple is what the user expects and it allows the user to reference the fields of the tuple by name. 
      • Tuple constants use parentheses to indicate the tuple and commas to delimit fields in the tuple. For example, ('life', 31) describes a tuple constant with two fields.
    • Bag 
      • A bag is an unordered collection of tuples. Because it has no order, it is not possible to reference tuples in a bag by position. 
      • Like tuples, a bag may have a schema associated with it. In the case of a bag, the schema describes all tuples within the bag. 
      • Bag constants are constructed using braces, with tuples in the bag separated by commas. For example, {('bally', 30), ('amit', 32), ('yogesh', 33)} constructs a bag with three tuples, each with two fields. 
      • As such Pig does not provide or set type that can store items of any type. It is possible to simulate set type using the bag, by wrapping the desired type in a tuple of one field. For instance, if we want to store a set of integers, we can create a bag with a tuple with one field, which is an int. 
      • Bag is the one type in Pig that is not required to fit into memory, the reason is bags are used to store collections when grouping, bags can become quite large. Pig may flush bags to disk when necessary, keeping only portion of the bag in memory.
Details on Pig DataTypes can be found here.

Pig Schema

Pig has a very cool attitude towards schema and typing. It will make use of schema if a schema for the data is available (for error checking and optimization), in case schema is missing, it will still process the data, making the best guesses; these guesses will be based on how the script treats the data.

We can explicitly define and instruct Pig for schema:

With data types:
       users = load 'user_information' as (name:chararray, login:chararray, country:chararray, age:float);

Without explicit data types:
       users = load 'user_information' as (name, login, country, age); 

Here Pig expects data to have four fields. Extra fields will be truncated and lesser will be padded by nulls.

Pig Modes

Pig scripts can be run in local node as well as in map reduce node.
  • Local Mode
    • To run pig scripts in local mode, no Hadoop or HDFS installation is required. In this mode all files are used and run from local host and file system. 
  • MapReduce Mode
    • To run pig scripts in MapReduce mode, we need a Hadoop cluster and HDFS installation.


For Pig setup we will use one of the three nodes configured earlier and all three Hadoop2 (MapReduce and HDFS) nodes (as setup described in earlier post ).

Note that here we are using following details for installation (for complete setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostname: 
      • server01
Steps to install Pig:
  1. Install Pig - we will use Apache Pig 0.13.0 (with Hadoop2)
    • Download pig-0.13.0.tar.gz from Pig Website, note that we will run Pig on the top on Hadoop2 stack
    • Extract downloaded package to anishsneh@server01:/home/anishsneh/installs, such that we have:
       [anishsneh@server01 installs]$ ls -ltr pig-0.13.0
      total 34856
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Jun 29 02:30 scripts
      -rw-rw-r--. 1 anishsneh anishsneh  8756785 Jun 29 02:30 pig-0.13.0-withouthadoop-h2.jar
      -rw-rw-r--. 1 anishsneh anishsneh  8748886 Jun 29 02:30 pig-0.13.0-withouthadoop-h1.jar
      -rw-rw-r--. 1 anishsneh anishsneh 17826437 Jun 29 02:30 pig-0.13.0-h1.jar
      -rw-rw-r--. 1 anishsneh anishsneh    23441 Jun 29 02:30 ivy.xml
      drwxr-xr-x. 4 anishsneh anishsneh     4096 Jun 29 02:31 shims
      drwxr-xr-x. 3 anishsneh anishsneh     4096 Jun 29 02:31 lib-src
      -rw-rw-r--. 1 anishsneh anishsneh     2564 Jun 29 02:31 RELEASE_NOTES.txt
      -rw-rw-r--. 1 anishsneh anishsneh     1307 Jun 29 02:31 README.txt
      -rw-rw-r--. 1 anishsneh anishsneh     2125 Jun 29 02:31 NOTICE.txt
      -rw-rw-r--. 1 anishsneh anishsneh    11358 Jun 29 02:31 LICENSE.txt
      -rw-rw-r--. 1 anishsneh anishsneh   163510 Jun 29 02:31 CHANGES.txt
      -rw-rw-r--. 1 anishsneh anishsneh    87837 Jun 29 02:31 build.xml
      drwxr-xr-x. 3 anishsneh anishsneh     4096 Jul 18 21:14 contrib
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Jul 18 21:14 conf
      drwxr-xr-x. 6 anishsneh anishsneh     4096 Jul 18 21:14 docs
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Jul 18 21:14 ivy
      drwxr-xr-x. 3 anishsneh anishsneh     4096 Jul 18 21:14 lib
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Jul 18 21:14 license
      drwxr-xr-x. 8 anishsneh anishsneh     4096 Jul 18 21:14 src
      drwxr-xr-x. 5 anishsneh anishsneh     4096 Jul 18 21:14 tutorial
      drwxr-xr-x. 9 anishsneh anishsneh     4096 Jul 18 21:14 test
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Jul 18 21:14 bin
    • Create hdfs://server01:9000/data/pig directory on HDFS and change its permissions to 777 on server01 (for this demo)
           [anishsneh@server01 installs]$ hadoop fs -mkdir /data/pig
           [anishsneh@server01 installs]$ hadoop fs -chmod 777 /data/pig
  2. Configure Pig
    • Set PIG_HOME=/home/anishsneh/installs/pig-0.13.0 in ~/.bashrc (or wherever maintaining environment variables), reload profile/bash

Getting Started

Pig Latin code can be excuted by one of following methods:
  • Script
    • Execute commands in a file
    • $PIG_HOME/bin/pig script-file.pig
  • Grunt
    • Interactive Shell for executing Pig Commands.
    • Started when script file is NOT provided.
    • Can execute scripts from Grunt via run or exec commands.
  • Embedded
    • Execute Pig commands using PigServer class.
    • Just like JDBC to execute SQL.
    • Can have programmatic access to Grunt via PigRunner class.
Let's start with simple pig scripts.
  • LOAD

    LOAD 'path_to_data' [USING function] [AS schema];

           data:name of the directory or file
                   - Must be in single quotes
           USING: specifies the load function to use
                   - By default uses PigStorage which parses each line into fields using a delimiter
                   - Default delimiter is tab ('\t' )
                   - The delimiter can be customized using regular expressions
            AS: assign a schema to incoming data
                   - Assigns names to fields
                   - Declares types to fields
           users = LOAD '/data/pig/users.csv' USING PigStorage(',') AS (name:chararray, country:chararray);

    • Create user.csv such that
      [anishsneh@server01 installs]$ cat users.csv 
    • Upload user.csv to HDFS
      [anishsneh@server01 installs]$ hadoop fs -copyFromLocal users.csv /data/pig
    • Start grunt:
      [anishsneh@server01 installs]$ $PIG_HOME/bin/pig
    • Load data:
      grunt> users = LOAD '/data/pig/users.csv' USING PigStorage(',') AS (name:chararray, country:chararray)
  • DUMP and STORE 
  • DUMP displays the results to the screen.
  • STORE saves results (typically to a file).
  • No action is taken until DUMP or STORE commands are encountered.
  • Pig will parse, validate and analyze statements but not execute them.
  • Hadoop data is usually quite large and it. doesn't make sense to print it to the screen.
  • The common pattern is to persist results to Hadoop (HDFS, HBase) This is done with STORE command.
  • For information and debugging purposes we can print a small sub-set to the screen

    DUMP Data on Screen:
    grunt> DUMP users;
    2014-07-18 21:18:57,991 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapreduce.job.counters.limit is deprecated. Instead, use mapreduce.job.counters.max
    2014-07-18 21:18:57,992 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
    2014-07-18 21:18:58,016 [main] INFO - Pig features used in the script: UNKNOWN
    2014-07-18 21:18:58,020 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
    2014-07-18 21:18:58,050 [main] INFO - Key [pig.schematuple] was not set... will not generate code.
    2014-07-18 21:18:58,073 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
    2014-07-18 21:18:58,073 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
    2014-07-18 21:18:58,150 [main] INFO  org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt__0001_m_000001_1' to hdfs://server01:9000/tmp/temp2110693685/tmp2035300874/_temporary/0/task__0001_m_000001
    2014-07-18 21:18:58,174 [main] WARN - SchemaTupleBackend has already been initialized
    2014-07-18 21:18:58,186 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
    2014-07-18 21:18:58,186 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
    (Anish,India )

    STORE Results:
    grunt> STORE users INTO '/data/pig/results';
    After completion of job we will see that the contents of user object has been stored in HDFS directory hdfs://server01:9000/data/pig/results