We learnt about Flume in previous post. We will setup and run Flume agent with Avro source and a Java based client here.
We will setup single agent :
Note that here we are using following details for installation (for complete setup):
- Installation base directory:
Installation
For agent installation we will use one the three nodes setup earlier for agent JVM and all three HDFS nodes for sink (as setup described in earlier post ).We will setup single agent :
Note that here we are using following details for installation (for complete setup):
- Installation base directory:
- /home/anishsneh/installs
- anishsneh
- server01
-
Install Flume - we will use Apache Flume 1.5.0.1 (with Hadoop2)
- Download apache-flume-1.5.0.1-bin.tar.gz from Flume Website, note that we are using Hadoop 2 for sink
- Extract downloaded package to anishsneh@server01:/home/anishsneh/installs, such that we have:
[anishsneh@server01 installs]$ ls -ltr apache-flume-1.5.0.1-bin total 128 -rw-r--r--. 1 anishsneh anishsneh 1779 Mar 28 15:15 README -rw-r--r--. 1 anishsneh anishsneh 6172 Mar 28 15:15 DEVNOTES -rw-r--r--. 1 anishsneh anishsneh 22517 May 6 16:29 LICENSE -rw-r--r--. 1 anishsneh anishsneh 61591 Jun 10 13:56 CHANGELOG -rw-r--r--. 1 anishsneh anishsneh 249 Jun 10 14:08 NOTICE -rw-r--r--. 1 anishsneh anishsneh 1591 Jun 10 14:08 RELEASE-NOTES drwxr-xr-x. 10 anishsneh anishsneh 4096 Jun 10 15:10 docs drwxrwxr-x. 2 anishsneh anishsneh 4096 Sep 17 14:59 lib drwxrwxr-x. 2 anishsneh anishsneh 4096 Sep 17 14:59 tools drwxr-xr-x. 2 anishsneh anishsneh 4096 Sep 17 14:59 bin drwxr-xr-x. 2 anishsneh anishsneh 4096 Sep 17 14:59 conf
- Create hdfs://server01:9000/data/flume directory on HDFS and change its permissions to 777 on server01 (for this demo)
[anishsneh@server01 installs]$ hadoop fs -mkdir /data/flume
[anishsneh@server01 installs]$ hadoop fs -chmod 777 /data/flume
-
Configure Flume
- Set FLUME_HOME=/home/anishsneh/installs/apache-flume-1.5.0.1-bin in ~/.bashrc (or wherever maintaining environment variables), reload profile/bash
- Copy $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties
[anishsneh@server01 apache-flume-1.5.0.1-bin]$ cp flume-conf.properties.template flume-conf.properties
- Edit $FLUME_HOME/conf/flume-conf.properties with following contents
demo_agent.sources = demo-application-source demo_agent.sinks = hdfs-cluster-sink demo_agent.channels = memory-channel-01 # set channel for source demo_agent.sources.demo-application-source.channels = memory-channel-01 # set channel for sink demo_agent.sinks.hdfs-cluster-sink.channel = memory-channel-01 # properties of demo-application-source demo_agent.sources.demo-application-source.type = avro demo_agent.sources.demo-application-source.bind = server01 demo_agent.sources.demo-application-source.port = 55555 # properties of memory-channel-01 demo_agent.channels.memory-channel-01.type = memory demo_agent.channels.memory-channel-01.capacity = 1000 demo_agent.channels.memory-channel-01.transactionCapacity = 100 # properties of hdfs-cluster-sink demo_agent.sinks.hdfs-cluster-sink.type = hdfs demo_agent.sinks.hdfs-cluster-sink.hdfs.path = hdfs://server01:9000/data/flume demo_agent.sinks.hdfs-cluster-sink.hdfs.rollCount = 50 demo_agent.sinks.hdfs-cluster-sink.hdfs.filePrefix = demo-event demo_agent.sinks.hdfs-cluster-sink.hdfs.writeFormat = Text demo_agent.sinks.hdfs-cluster-sink.hdfs.fileType = DataStream
- Start Flume agent with following command:
[anishsneh@server01 apache-flume-1.5.0.1-bin]$ $FLUME_HOME/bin/flume-ng agent --name demo_agent --conf-file $FLUME_HOME/conf/flume-conf.properties
We will see lots of log statements will keep on scrolling. These are default log statements of agent. Congratulations our Flume Agent is up and running (with AVRO source and HDFS sink as configured).
-
Writing Flume Client
- Create a maven based project with following structure in eclipse:
- Add dependencies to pom.xml such that:
4.0.0 com.anishsneh.demo flume-avro-client 0.0.1-SNAPSHOT flume-avro-client http://maven.apache.org junit junit 4.8.2 test org.apache.flume flume-ng-core 1.5.0 org.apache.flume flume-ng-sdk 1.5.0 org.apache.flume flume-ng-configuration 1.5.0 com.thoughtworks.xstream xstream 1.4.4 org.codehaus.jettison jettison 1.3.4 - Create AvroClient.java with following contents
package com.anishsneh.demo.flume.avro; import java.util.UUID; import com.anishsneh.demo.flume.avro.vo.User; public class AvroClient { public static void main(String[] args) { RpcClientFacade client = new RpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("192.168.126.130", 55555); for (int i = 1; i <= 500; i++) { User user = new User(); user.setId(UUID.randomUUID().toString()); user.setEmail("me_" + i + "@anishsneh.com"); user.setCreatedTime(System.currentTimeMillis()); user.setLogin("login_" + i); user.setName("USER_NAME_" + i); client.sendDataToFlume(user.toString()); System.out.println("==============================="); System.out.println(user.toString()); try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } } client.cleanUp(); } }
- Create RpcClientFacade.java with following contents
package com.anishsneh.demo.flume.avro; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; public class RpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Map<String, String> map = new HashMap<String, String>(); map.put("header_1", "Demo header 1"); map.put("header_2", "Demo header 2"); Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"), map); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
- Create User.java with following contents
package com.anishsneh.demo.flume.avro.vo; import java.io.Serializable; import com.thoughtworks.xstream.XStream; import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; public class User implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String login; private String email; private long createdTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getLogin() { return login; } public void setLogin(String login) { this.login = login; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } public long getCreatedTime() { return createdTime; } public void setCreatedTime(long createdTime) { this.createdTime = createdTime; } @Override public String toString(){ XStream xstream = new XStream(new JettisonMappedXmlDriver()); return xstream.toXML(this); } }
- Run com.anishsneh.demo.flume.avro.AvroClient.main()
- Check HDFS directory i.e. hdfs://server01:9000/data/flume it will have collected events coming from Avro event source i.e. our Java client:
HDFS - Flume Events
HDFS - Event File Contents
- Create a maven based project with following structure in eclipse: