Friday, July 11, 2014

Flume | Streaming Events

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amount of streaming event data into the Hadoop Distributed File System - HDFS (or may be to some other centralized data store). It is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.
Flume uses Sources and Sinks abstractions and a pipeline data flow to link them together.

Flume - A High Level View

Why Flume

In big data deployment, data analysis is the second part of the game rather is applicable only if we are able to collect data first; getting the data into a Hadoop cluster is the first step for any big data platform.
Considering the slow nature of HDFS it is easier to load slowly moving big chunks of data to HDFS directly but when it comes to handling fast moving data with much higher velocities, HDFS alone is not the right choice rather we can say not self sufficient.
Here comes the need of a middleman in between who can handle fast incoming streams and under the hood keeps on flushing to HDFS.

When we talk about loading data to HDFS we should keep in mind few things:
  • HDFS is not a real filesystem in the traditional sense, in HDFS the file exists only as a directory entry, it shows as having zero length until the file is closed. This means if data is written to a file for an extended period without closing it, a network disconnect with the client will leave you with nothing but an empty file for all your efforts. This may lead you to the conclusion that it would be wise to write small files so you can close them as soon as possible.
  • Another problem is Hadoop doesn't like lots of tiny files as the HDFS metadata is kept in memory on the NameNode, the more files you create, the more primary memory/RAM we will use. From a MapReduce prospective also writing tiny files lead to poor efficiency.
  • We can load data to Hadoop by using HDFS APIs, commands. Loading data via HDFS APIs or commands work data which is ready to upload and we have all the data inhand. Since Hadoop is designed to handle batch operations, dealing with real time streaming data is bit problematic. Suppose we have a system which is generating data at massive rate. What should be the batch frequency of loading data to HDFS? It might vary as per the need but certainly we would not have the right solution in place using batch apporach.
As we see due to internal design/architecture HDFS is not suitable for handling high-throughput streams directly. Apache Flume makes data loading process easy, efficient and manageable.

Key Features

  • Streams data from multiple sources into Hadoop for analysis or as dataware house.
  • Collects high-velocity, high-volume logs in real time.
  • Handles the zig-zag flows (dynamic spikes) where the rate of incoming data sometime exceeds the rate at which data can be written to the destination
  • Provides guarantee data delivery.
  • Scales horizontally to handle additional data volume as each agent runs independently.
  • Supports dynamic reconfiguration without the need for a restart, which allows for reduction in the downtime for flume agents.
  • Deployed as fully distributed with no central coordination point. Each agent runs independent of others with no inherent single point of failure.

Key Constituents

  • Event
    • An Event represents the fundamental unit of data transported by Flume from a source to its the destination.
    • Event is a byte array payload having optional string headers.
    • Payload is just like a binary to Flume, it cannot see or manipulate payloads.
    • Headers are specified as an unordered collection of string key-value pairs, with keys being unique across the collection.
    • Headers can be used for event routing.
  • Client
    • An entity that generates events and sends them to one or more Agents. Ideal clients runs as a part of application process.
    • Flume client supports RPC based and HTTP based interfaces
    • Flume also supports custom clients developed using client SDK (org.apache flume.api)
    • It decouples Flume from the system where event data is generated.
  • Agent
    • A container/server for hosting sources, channels, sinks and other components that enable the transportation of events from one place to another. Runs as an independent JVM
    • It is basically a daemon running for setting up Flume data ingestion flows and components.
    • It provides Configuration, life-cycle Management, monitoring support for hosted components.
    • Agents use transactional exchange to guarantee delivery across hops.
  • Source
    • It writes events to one or more channels.
    • It receives events from a specialized location or mechanism and places it on one or more channels.
    • Main source types:
      • Avro source: Listens on Avro port and receives events from external Avro client streams.
      • Thrift source: Listens on Thrift port a n d receives events from external Thrift client streams.
      • Exec source: Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out. If the process exits due to any reason, the source also exits and will produce no further data.
      • JMS source: JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application as per Flume documentation it should work with any JMS compliant provider but tested with Active MQ only as of now. The JMS source provides configurable batch size, message selector, user/pass, and message to Flume event converter.
      • Syslog source: Reads syslog data and generate Flume events. The UDP source treats an entire message as a single event whereas TCP sources create a new event for each string of characters separated by a newline.
      • HTTP source: A source which accepts Flume Events by HTTP POST and GET. HTTP requests are converted into flume events by a pluggable handler. This handler must implement HTTPSourceHandler interface, handler takes a HttpServletRequest and returns a list of flume events
    • Source requires at least one channel to function
  • Channel
    • A channel is the holding area as events are passed from a source to a sink.
    • A passive component that buffers the incoming events until they are drained by Sinks.
    • Different channels offer different levels of durability (Memory, File, Database)
    • Channels are fully transactional.
    • Provides weak ordering guarantees
    • Can work with any number of Sources and Sinks
    • Durable channels ensure that data is never lost in transit.
    • Main channel types:
      • In-Memory channel: Events are stored in an in-memory queue with configurable max size. Generally it is used for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures.
      • JDBC-based durable channel: The events are stored in a persistent storage that's backed by a database.
      • File-based durable channel: The events are persisted in a file for durability: It also supports encryption i.e. file data can be encrypted for security purposes.
      • Spillable Memory channel: The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel.
      • Custom channel: A custom channel is your own implementation of the Channel interface. A custom channel's class and its dependencies must be included in the agent's classpath when starting the Flume agent. The type of the custom channel is its FQCN (class name with package name)
  • Sink
    • An active component that removes events from a Channel and transmits them to their next hop destination.
    • Different types of sinks:
      • HDFS sink: This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events.
      • Avro sink: Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair.
      • Thrift sink: Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair.
      • IRC sink: The IRC sink takes messages from attached channel and relays those to configured IRC destinations.
      • LFS or File Roll sink: Stores events on the local filesystem.
      • Null sink: It discards all events it receives from the channel.
      • HBase sink: Sends events to HBase server, Hbase configuration is picked up from the first hbase-site.xml encountered the classpath.
      • Morphline Solr sink: This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications.
      • ElasticSearch sink: This sink writes data to an elasticsearch cluster.
    • Requires exactly one channel to function
  • Interceptor
    • Interceptors are applied to a source in predetermined order to enable decorating and filtering of events where necessary.
    • Built-in interceptors allow adding headers such as timestamps, hostname, static markers.
    • Custom interceptors can inspect event payload to create specific headers where necessary
    • Default interceptors provided with Flume:
      • Timestamp interceptor: Inserts the time in millis at which it processes the eventiii This interceptor inserts a header with key timestamp whose value is the relevant timestamp.
      • Host interceptor: Inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.
      • Static interceptor: Static interceptor allows user to append a static header with static value to all events.
      • UUID interceptor: This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43cl-8fad b7a586fclb97, which represents a 128-bit value.
      • Regex Filtering interceptor: This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression. The supplied regular expression can be used to include events or exclude events.
      • Regex Extractor interceptor: This interceptor extracts regex match groups using a specified regular expression and appends the match groups as headers on the event. It also supports pluggable serializers for formatting the match groups before adding them as event headers.
  • Channel Selector
    • Channel selector used to select one or more channels from all the configured channels, based on preset criteria.
    • Built-in Channel Selectors:
      • Replicating: for duplicating the events.
      • Multiplexing: for routing based on headers.
    • Custom Channel Selectors can use dynamic criteria where needed.
  • Sink Processor
    • Sink Processor is responsible for invoking one sink from a specified group of Sinks.
    • Built-in Sink Processors:
      • Default sink processor: Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sink.
      • Failover sink processor: Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).
      • Load balancing sink processor: Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to ROUND_ROBIN type, but can be overridden via configuration. Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.
Note that Flume uses channel-based transactions to guarantee reliable message delivery. When a message agent to another, two transactions are started, one on the agent that delivers the event and the other receives the event. In order for the sending agent to commit it's transaction, it must receive success receiving agent. The receiving agent only returns a success indication if it's own transaction commits This ensures guaranteed delivery semantics between the hops that the flow makes.

Event Flows

Typical Flume Event Flow
  • Sending Data
    • Clients send events to agents.
    • Source operating within the agent receive the events
    • Source passes received events through interceptors and puts them on channel identified the configured channel selector. Note that event population in Channel is transactional.
  • Data Draining (Internal processing for next step)
    • Sink Processor identifies one of the configured sinks and invokes it.
    • Invoked sink takes events from its configured channel and sends it to the next hop destination, note that event removal from channel is transactional.
    • In case the transmission of event fails, the sink processor can take secondary action.
  • Data Transmission
    • Agents transmit events to next hop destinations.
    • In case of transmission failure the second agent retains the data in its channels
We will see how to setup Flume agent and send events to HDFS via Avro source in the next post