Thursday, September 18, 2014

Flume | Setup

We learnt about Flume in previous post. We will setup and run Flume agent with Avro source and a Java based client here.

Installation

For agent installation we will use one the three nodes setup earlier for agent JVM and all three HDFS nodes for sink (as setup described in earlier post ).
We will setup single agent :

Note that here we are using following details for installation (for complete setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01
Steps to install Flume NG agent:
  1. Install Flume - we will use Apache Flume 1.5.0.1 (with Hadoop2)
    • Download apache-flume-1.5.0.1-bin.tar.gz from Flume Website, note that we are using Hadoop 2 for sink
    • Extract downloaded package to anishsneh@server01:/home/anishsneh/installs, such that we have:
           [anishsneh@server01 installs]$ ls -ltr apache-flume-1.5.0.1-bin
           total 128
           -rw-r--r--.  1 anishsneh anishsneh  1779 Mar 28 15:15 README
           -rw-r--r--.  1 anishsneh anishsneh  6172 Mar 28 15:15 DEVNOTES
           -rw-r--r--.  1 anishsneh anishsneh 22517 May  6 16:29 LICENSE
           -rw-r--r--.  1 anishsneh anishsneh 61591 Jun 10 13:56 CHANGELOG
           -rw-r--r--.  1 anishsneh anishsneh   249 Jun 10 14:08 NOTICE
           -rw-r--r--.  1 anishsneh anishsneh  1591 Jun 10 14:08 RELEASE-NOTES
           drwxr-xr-x. 10 anishsneh anishsneh  4096 Jun 10 15:10 docs
           drwxrwxr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 lib
           drwxrwxr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 tools
           drwxr-xr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 bin
           drwxr-xr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 conf
          
    • Create hdfs://server01:9000/data/flume directory on HDFS and change its permissions to 777 on server01 (for this demo)
           [anishsneh@server01 installs]$ hadoop fs -mkdir /data/flume
          
           [anishsneh@server01 installs]$ hadoop fs -chmod 777 /data/flume
          
  2. Configure Flume
    • Set FLUME_HOME=/home/anishsneh/installs/apache-flume-1.5.0.1-bin in ~/.bashrc (or wherever maintaining environment variables), reload profile/bash
    • Copy $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties
           [anishsneh@server01 apache-flume-1.5.0.1-bin]$ cp flume-conf.properties.template flume-conf.properties
          
    • Edit $FLUME_HOME/conf/flume-conf.properties with following contents
       demo_agent.sources = demo-application-source
       demo_agent.sinks = hdfs-cluster-sink
       demo_agent.channels = memory-channel-01
      
       # set channel for source
       demo_agent.sources.demo-application-source.channels = memory-channel-01
      
       # set channel for sink
       demo_agent.sinks.hdfs-cluster-sink.channel = memory-channel-01
      
       # properties of demo-application-source
       demo_agent.sources.demo-application-source.type = avro
       demo_agent.sources.demo-application-source.bind = server01
       demo_agent.sources.demo-application-source.port = 55555
      
       # properties of memory-channel-01
       demo_agent.channels.memory-channel-01.type = memory
       demo_agent.channels.memory-channel-01.capacity = 1000
       demo_agent.channels.memory-channel-01.transactionCapacity = 100
      
       # properties of hdfs-cluster-sink
       demo_agent.sinks.hdfs-cluster-sink.type = hdfs
       demo_agent.sinks.hdfs-cluster-sink.hdfs.path = hdfs://server01:9000/data/flume
       demo_agent.sinks.hdfs-cluster-sink.hdfs.rollCount = 50
       demo_agent.sinks.hdfs-cluster-sink.hdfs.filePrefix = demo-event
       demo_agent.sinks.hdfs-cluster-sink.hdfs.writeFormat = Text
       demo_agent.sinks.hdfs-cluster-sink.hdfs.fileType = DataStream
          
    • Start Flume agent with following command:
           [anishsneh@server01 apache-flume-1.5.0.1-bin]$ $FLUME_HOME/bin/flume-ng agent --name demo_agent --conf-file $FLUME_HOME/conf/flume-conf.properties
          
      We will see lots of log statements will keep on scrolling. These are default log statements of agent. Congratulations our Flume Agent is up and running (with AVRO source and HDFS sink as configured).
  3. Writing Flume Client
    • Create a maven based project with following structure in eclipse:
    • Add dependencies to pom.xml such that:
           
      
       4.0.0
       com.anishsneh.demo
       flume-avro-client
       0.0.1-SNAPSHOT
       flume-avro-client
       http://maven.apache.org
       
        
         junit
         junit
         4.8.2
         test
        
        
         org.apache.flume
         flume-ng-core
         1.5.0
        
        
         org.apache.flume
         flume-ng-sdk
         1.5.0
        
        
         org.apache.flume
         flume-ng-configuration
         1.5.0
        
        
        
         com.thoughtworks.xstream
         xstream
         1.4.4
        
        
         org.codehaus.jettison
         jettison
         1.3.4
        
       
      
      
          
    • Create AvroClient.java with following contents
      package com.anishsneh.demo.flume.avro;
      
      import java.util.UUID;
      
      import com.anishsneh.demo.flume.avro.vo.User;
      
      public class AvroClient {
       public static void main(String[] args) {
        RpcClientFacade client = new RpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("192.168.126.130", 55555);
        for (int i = 1; i <= 500; i++) {
         User user = new User();
         user.setId(UUID.randomUUID().toString());
         user.setEmail("me_" + i + "@anishsneh.com");
         user.setCreatedTime(System.currentTimeMillis());
         user.setLogin("login_" + i);
         user.setName("USER_NAME_" + i);
         client.sendDataToFlume(user.toString());
         System.out.println("===============================");
         System.out.println(user.toString());
         try {
          Thread.sleep(500);
         } catch (Exception e) {
          e.printStackTrace();
         }
        }
      
        client.cleanUp();
       }
      }
          
    • Create RpcClientFacade.java with following contents
      package com.anishsneh.demo.flume.avro;
      
      import java.nio.charset.Charset;
      import java.util.HashMap;
      import java.util.Map;
      
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.api.RpcClient;
      import org.apache.flume.api.RpcClientFactory;
      import org.apache.flume.event.EventBuilder;
      
      public class RpcClientFacade {
      
       private RpcClient client;
       private String hostname;
       private int port;
      
       public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
       }
      
       public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Map<String, String> map = new HashMap<String, String>();
        map.put("header_1", "Demo header 1");
        map.put("header_2", "Demo header 2");
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), map);
      
        // Send the event
        try {
         client.append(event);
        } catch (EventDeliveryException e) {
         // clean up and recreate the client
         client.close();
         client = null;
         client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
       }
      
       public void cleanUp() {
        // Close the RPC connection
        client.close();
       }
      }
      
    • Create User.java with following contents
      package com.anishsneh.demo.flume.avro.vo;
      
      import java.io.Serializable;
      
      import com.thoughtworks.xstream.XStream;
      import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
      
      public class User implements Serializable{
       
       private static final long serialVersionUID = 1L;
       
       private String id;
       private String name;
       private String login;
       private String email;
       private long createdTime;
       
       public String getId() {
        return id;
       }
       public void setId(String id) {
        this.id = id;
       }
       public String getName() {
        return name;
       }
       public void setName(String name) {
        this.name = name;
       }
       public String getLogin() {
        return login;
       }
       public void setLogin(String login) {
        this.login = login;
       }
       public String getEmail() {
        return email;
       }
       public void setEmail(String email) {
        this.email = email;
       }
       public long getCreatedTime() {
        return createdTime;
       }
       public void setCreatedTime(long createdTime) {
        this.createdTime = createdTime;
       }
       
       @Override
       public String toString(){
        XStream xstream = new XStream(new JettisonMappedXmlDriver());
        return xstream.toXML(this);
       }
      }
      
      
          
    • Run com.anishsneh.demo.flume.avro.AvroClient.main()
    • Check HDFS directory i.e. hdfs://server01:9000/data/flume it will have collected events coming from Avro event source i.e. our Java client:
      HDFS - Flume Events
      Contents of each file will be like:
      HDFS - Event File Contents