Thursday, September 18, 2014

Flume | Setup

We learnt about Flume in previous post. We will setup and run Flume agent with Avro source and a Java based client here.

Installation

For agent installation we will use one the three nodes setup earlier for agent JVM and all three HDFS nodes for sink (as setup described in earlier post ).
We will setup single agent :

Note that here we are using following details for installation (for complete setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01
Steps to install Flume NG agent:
  1. Install Flume - we will use Apache Flume 1.5.0.1 (with Hadoop2)
    • Download apache-flume-1.5.0.1-bin.tar.gz from Flume Website, note that we are using Hadoop 2 for sink
    • Extract downloaded package to anishsneh@server01:/home/anishsneh/installs, such that we have:
           [anishsneh@server01 installs]$ ls -ltr apache-flume-1.5.0.1-bin
           total 128
           -rw-r--r--.  1 anishsneh anishsneh  1779 Mar 28 15:15 README
           -rw-r--r--.  1 anishsneh anishsneh  6172 Mar 28 15:15 DEVNOTES
           -rw-r--r--.  1 anishsneh anishsneh 22517 May  6 16:29 LICENSE
           -rw-r--r--.  1 anishsneh anishsneh 61591 Jun 10 13:56 CHANGELOG
           -rw-r--r--.  1 anishsneh anishsneh   249 Jun 10 14:08 NOTICE
           -rw-r--r--.  1 anishsneh anishsneh  1591 Jun 10 14:08 RELEASE-NOTES
           drwxr-xr-x. 10 anishsneh anishsneh  4096 Jun 10 15:10 docs
           drwxrwxr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 lib
           drwxrwxr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 tools
           drwxr-xr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 bin
           drwxr-xr-x.  2 anishsneh anishsneh  4096 Sep 17 14:59 conf
          
    • Create hdfs://server01:9000/data/flume directory on HDFS and change its permissions to 777 on server01 (for this demo)
           [anishsneh@server01 installs]$ hadoop fs -mkdir /data/flume
          
           [anishsneh@server01 installs]$ hadoop fs -chmod 777 /data/flume
          

Saturday, September 13, 2014

Kafka | Setup

We learnt about Kafka in previous post. We will setup and run three node Kafka cluster (fully distributed) here.

Installation

For installation we will use three CentOS VMs which we configured in earlier post. We will setup three node Kafka cluster.

Note that here we are using following details for installation (for complete Kafka setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01 (broker 1)
      • server02 (broker 2)
      • server03 (broker 3)
Steps to install Kafka:
  1. Install Kafka We will use Kafka 0.8.1.1 (kafka_2.10-0.8.1.1.tgz) built using Scala 2.10
    • Download kafka_2.10-0.8.1.1.tgz from Apache Kafka webpage, note that we are using Kafka 0.8.1.1 which is compiled using Scala 2.10
    • Extract downloaded package to /home/anishsneh/installs, such that we have:
      [anishsneh@server01 installs]$ ls -ltr kafka_2.10-0.8.1.1
      total 28
      -rw-rw-r--. 1 anishsneh anishsneh   162 Apr 22 11:37 NOTICE
      -rw-rw-r--. 1 anishsneh anishsneh 11358 Apr 22 11:37 LICENSE
      drwxr-xr-x. 2 anishsneh anishsneh  4096 Apr 22 12:26 libs
      drwxr-xr-x. 2 anishsneh anishsneh  4096 Apr 22 12:26 config
      drwxr-xr-x. 3 anishsneh anishsneh  4096 Apr 22 12:26 bin
      
    • Repeat above steps for all the three hosts.

Tuesday, September 9, 2014

HBase | Phoenix

We setup and started HBase cluster previous post. We will write a JDBC based client using Apache Phoenix here.

Apache Phoenix

Apache Phoenix is a JDBC skin on HBase client which turns HBase into a SQL supported database. The driving force behind Phoenix development was to use a well-under stood language like SQL to make it easier for people to use HBase instead of learning another proprietary API. It was originally it was developed by salesforce.com as as a Java/JDBC layer enabling developers to run SQL queries on Apache HBase and later it was open sourced and moved under Apache umbrella.
As per Apache documentation "Apache Phoenix is a SQL skin over HBase delivered as a client-embedded JDBC driver targeting low latency queries over HBase data. Apache Phoenix takes your SQL query, compiles it into a series of HBase scans and orchestrates the running of those scans to produce regular JDBC result sets."
It is entirely written in Java and provides a client-embeddable JDBC driver; It has its own query engine, co—processors and meta-data. Phoenix is used internally by salesforce.com for low latency queries in the order of milliseconds for simple queries or seconds when tens of millions of rows are processed, according to the project's description.
The Phoenix query engine transforms SQL query to HBase scans, executes using co-processors and produces JDBC result sets. Under the hood it compiles queries into native HBase calls (there is NO map-reduce involved)
Note that Phoenix JDBC is developed only for HBase and restricted to HBase ONLY

Sunday, September 7, 2014

HBase | Setup

We learnt about HBase in previous post. We will setup and run client on an HBase cluster(fully distributed) here.

Installation

For installation we will use three node Hadoop - YARN cluster (as setup/described in earlier post ).
We will setup three node HBase cluster:

Note that here we are using following details for installation (for complete setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01 (master+slave)
      • server02 (only slave)
      • server03 (only slave)
Steps to install HBase (on the top of Hadoop 2 cluster):
  1. Install HBase - we will use HBase 0.98.4 (Hadoop2)
    • Download hbase-0.98.4-hadoop2-bin.tar.gz from HBase Website, note that we are using Hadoop 2 version of HBase binary
    • Extract downloaded package to /home/anishsneh/installs, such that we have:
      [anishsneh@server01 installs]$ ls -ltr hbase-0.98.4-hadoop2
      total 172
      -rw-r--r--.  1 anishsneh anishsneh    897 Jun  6 10:33 NOTICE.txt
      -rw-r--r--.  1 anishsneh anishsneh  11358 Jun  6 10:33 LICENSE.txt
      -rw-r--r--.  1 anishsneh anishsneh   1377 Jul 14 18:23 README.txt
      drwxr-xr-x.  2 anishsneh anishsneh   4096 Jul 14 18:23 conf
      drwxr-xr-x.  4 anishsneh anishsneh   4096 Jul 14 18:23 bin
      -rw-r--r--.  1 anishsneh anishsneh 134544 Jul 14 18:27 CHANGES.txt
      drwxr-xr-x.  7 anishsneh anishsneh   4096 Jul 14 19:37 hbase-webapps
      drwxr-xr-x. 29 anishsneh anishsneh   4096 Jul 14 19:45 docs
      drwxrwxr-x.  3 anishsneh anishsneh   4096 Sep  7 14:47 lib
      
    • Repeat above steps for all the three hosts.
    • Create hdfs://server01:9000/data/hbase directory on HDFS and change its permissions to 777 (for this demo)
    • Create /home/anishsneh/installs/tmp/hbase directory on LFS in all of the three servers (i.e. server01, server02, server03)

Friday, August 29, 2014

Spring Integration | Demo

As explained in previous post Spring Integration application can run inside any spring application container. It may be a web application (loading context via web.xml) or a standalone application (loading context via ClassPathXmlApplicationContext.class).

We will start with a quick file watcher example in which we will implement a file adaptor which will process incoming files asynchronously.

We will run the application as a Java application using main() function and ClassPathXmlApplicationContext.class

Our project will have following structure:
 

Saturday, July 19, 2014

Hadoop | MapReduce API

MapReduce is a massively scalable, parallel processing framework that works in tandem with HDFS. MapReduce and Hadoop enable compute to execute at the location of the data, rather than moving data to the compute location; data storage and computation coexist on the same physical nodes in the cluster.

Hadoop MapReduce is an open source implementation of MapReduce

MapReduce (older API implementation with Hadoop 0.20 - just for conceptual reference)

Friday, July 18, 2014

Hadoop | Setup

In previous posts we saw the basic concepts of Hadoop i.e. HDFS & MapReduce. Now we will setup Hadoop in all the three modes (standalone, pseudo-distributed & full-distributed)

Installation

For installation we will need single machine for standalone and pseudo-distributed modes. For full-distributed we will be using three machines (CentOS 6.5 x86_64 VM, running on VMware Player), first we will set up cluster for all the three modes.

Note that here we are using following detailsfor installation (for complete setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01 (master+slave)
      • server02 (only slave)
      • server03 (only slave)
Steps to install Hadoop 2:
  1. Install Java - we will use JDK 1.7
    • Download 64 bit JDK 1.7 from Oracle Website, since we are using 64 bit OS here (Linux x64, jdk-*-linux-x64.tar.gz)
    • Extract downloaded package to /home/anishsneh/installs, such that we have:
      [anishsneh@server01 installs]$ ls -ltr jdk1.7.0_51
      total 19768
      -rw-r--r--. 1 anishsneh anishsneh   123324 Dec 18  2013 THIRDPARTYLICENSEREADME-JAVAFX.txt
      -r--r--r--. 1 anishsneh anishsneh   173559 Dec 18  2013 THIRDPARTYLICENSEREADME.txt
      -r--r--r--. 1 anishsneh anishsneh      114 Dec 18  2013 README.html
      -r--r--r--. 1 anishsneh anishsneh       40 Dec 18  2013 LICENSE
      -r--r--r--. 1 anishsneh anishsneh     3339 Dec 18  2013 COPYRIGHT
      -rw-r--r--. 1 anishsneh anishsneh 19895644 Dec 18  2013 src.zip
      -rw-r--r--. 1 anishsneh anishsneh      499 Dec 18  2013 release
      drwxr-xr-x. 4 anishsneh anishsneh     4096 Apr 15 14:21 man
      drwxr-xr-x. 5 anishsneh anishsneh     4096 Apr 15 14:21 jre
      drwxr-xr-x. 5 anishsneh anishsneh     4096 Apr 15 14:21 lib
      drwxr-xr-x. 4 anishsneh anishsneh     4096 Apr 15 14:21 db
      drwxr-xr-x. 2 anishsneh anishsneh     4096 Apr 15 14:21 bin
      drwxr-xr-x. 3 anishsneh anishsneh     4096 Apr 15 14:21 include
      
    • Repeat above steps for all the three hosts.

Pig | High Level MapReduce

Pig is a high-level platform/scripting-language for creating/running MapReduce programs used with Hadoop. The language used in this platform is called Pig Latin. Pig Latin allows user to write complex MapReduce transformations using a simple scripting language. This language defines a set of transformations on a data set such as aggregate, join and sort conceptually similar to SQL for RDBMS systems.

Pig have rich set of functions, Pig Latin can be also be extended using UDF (User Defined Functions) which can be written in Java, Python, JavaScript, Ruby or Groovy as per custom requirements and then call directly from the language. Detailed list of Pig built in functions can be found here.

Pig is complete in that you can do all the required data manipulations in Apache Hadoop with Pig. It simplifies the complex operations like joins and filters etc. for performing query operations in hadoop.

Wide usage of Pig includes:
  • Extract Transform Load (ETL) e.g. Processing large amounts of log data, clean bad entries, join with other data-sets.
  • Research of "raw" Information e.g. User Audit Logs where Schema maybe unknown or inconsistent

Hadoop2 vs Hadoop1

Hadoop2 was a complete overhaul of Hadoop1, in Hadoop2 ASF introduced MapReduce 2.0 (MRv2) or Apache Hadoop YARN. It is a sub-project of Hadoop at the Apache Software Foundation introduced in Hadoop 2.0 that separates the resource management and processing components.

The main differences are categorized below:

Daemons
DaemonsHadoop 1Hadoop 2
HDFS
  • NameNode
  • SecondaryNamenode
  • DataNode
  • NameNode
  • CheckpointNode (similar to former SecondaryNamenode)
  • DataNode
   Processing MR1
  • JobTracker
  • TaskTracker
MR2 (YARN)
  • ResourceManager
    [one per cluster]
  • NodeManager
    [many per cluster, one per node]
  • Application Master
    [many per cluster]
 

Wednesday, July 16, 2014

Hadoop | MapReduce

In the previous post, we looked at the first component of Hadoop framework i.e. HDFS and its key features. Now we will see the concept of MapReduce frameworks.

Before we start with MapReduce, we need to understand a very important core concept of Hadoop (HDFS+MapReduce) framework known as Data Locality.
  • Data locality concept can be described as "bringing the compute to the data." In other words, whenever we use a MapReduce program on a particular part of HDFS data, we want to run that program on the node, or machine, that actually stores this data in HDFS. Doing so allows processes to be run much faster since it prevents us having to move large amounts of data around.
  • Hadoop tends to runs map tasks on nodes where the data is present locally to avoid network and inter-node communication latency as much as possible. As the input data is split into pieces and provided to map tasks, hence it is preferred that all the data given to respective map task is available on a single node.
Since HDFS only guarantees data having size equal to its block size (64M) to be present on one node, it is advised/advocated to have the split size equal to the HDFS block size so that the map task can take advantage of this data localization.

MapReduce is a framework/programming-model that allows developers to write programs that process massive amounts of unstructured data in parallel across a distributed cluster of processors or stand-alone computers.
A MapReduce program is composed of a map() function that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a reduce() function that performs a summary operation.

A typical Hadoop usage pattern involves three stages:
  • Loading data into HDFS
  • MapReduce operations.
  • Retrieving results from HDFS.
Let's start with Apache Hadoop MapReduce concepts.

Tuesday, July 15, 2014

Hive | Setup & Hands On

We learnt about Hive in previous post. We will setup and run Hive (with MySQL based metastore) here. Note that we will use one of the machines used in previous post hence JAVA_HOME and HADOOP_HOME related variables are assumed to be set in ~/.bashrc

Installation

For installation we will use master node (note that here in this demo will run HiveQL using Hive commandline console only, hence we will just install on master node). We will use following details for installation:
  • Installation base directory:
    • /home/anishsneh/installs
  • Installation user name:
    • anishsneh
  • Hadoop details (will use same Hadoop Cluster configured in previous post) with address:
    •  hdfs://server01:9000
  • We will use MySQL based metastore with following details (note that we need to install/configure a MySQL server with following details):
    • Server name: server01
    • Server URL: jdbc:mysql://localhost:3306
    • Database name: hive
    • MySQL username: hiveuser
    • MySQL password: Welcome1hive
Let's install and configure metastore database and Hive:

  • Install Metastore Database
    • Install MySQL server on server01 (using yum or any convenient method)
    • Use root password as "Welcome1root"
  • Configure Metastore Database
    • Connect and configure MySQL metastore as follows:
      [anishsneh@server01 installs]$ mysql -uroot -pWelcome1root
      Warning: Using a password on the command line interface can be insecure.
      Welcome to the MySQL monitor.  Commands end with ; or \g.
      Your MySQL connection id is 5
      Server version: 5.6.17 MySQL Community Server (GPL)
      
      Copyright (c) 2000, 2014, Oracle and/or its affiliates. All rights reserved.
      
      Oracle is a registered trademark of Oracle Corporation and/or its
      affiliates. Other names may be trademarks of their respective
      owners.
      
      Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
      
      mysql> create database hive;
      Query OK, 1 row affected (0.26 sec)
      
      mysql> create user 'hiveuser'@'%' IDENTIFIED BY 'Welcome1hive';
      Query OK, 0 rows affected (0.07 sec)
      
      mysql> GRANT all on *.* to 'hiveuser'@localhost identified by 'Welcome1hive';
      Query OK, 0 rows affected (0.00 sec)
      
      mysql> flush privileges;
      Query OK, 0 rows affected (0.00 sec)
      

Friday, July 11, 2014

HBase | Distributed, Scalable, NoSQL

HBase is an open source, non-relational, column-oriented distributed database modeled after Google’s BigTable and is written in Java, it runs on top of HDFS. It can be seen as a distributed, multidimensional, sorted map with sparse nature. It provides realtime random read/write access to data stored in HDFS
HBase falls under "NoSQL" umbrella. Technically speaking, HBase is really more a "Data Store" than "Data Base" because it lacks many of the features we find in an RDBMS, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
HBase is not a direct replacement for a classic SQL Database, although recently its performance has improved. HBase has many features which supports efficient scaling. HBase clusters expand by adding RegionServers that are hosted on commodity class servers (running on the top of Hadoop). If a cluster expands from 10 to 20 RegionServers, for example, it doubles both in terms of storage and as well as processing capacity
Mainly HBase is used to power websites/products e.g. StumbleUpon and Facebook's Messages Storing data that’s also used as a sink or a source to analytical jobs (usually MapReduce)
Before we select HBase in our application, we need to keep following things in mind:
  • We need to make sure that we have enough data. If we have hundreds of millions or billions of rows, then HBase is a good candidate. If we only have a few thousand/million rows, then using a traditional RDBMS might be a better choice due to the fact that all of our data might wind up on a single node (or two) and the rest of the cluster may be sitting idle.
  • We need to make sure we can live without all the extra features that an RDBMS provides (e.g. typed columns, secondary indexes, transactions, advanced query languages etc.) An application built against an RDBMS cannot be "ported" to HBase by simply changing a JDBC driver, for example. Consider moving from an RDBMS to HBase as a complete redesign as opposed to a port.
  • We need to make sure we have enough hardware; Even HDFS doesn’t do well with anything less than 5 DataNodes. HBase can run quite well stand-alone on a laptop - but this should be considered a development configuration only.
HBase itself is a vast implementation, we will see the basic concepts here. We will also setup a fully distributed HBase cluster running on the top of HDFS (three nodes) and run fundamental CRUD operations.

Key Features

  • Strongly consistent and random reads/writes. 
  • Horizontal scalability and Automatic sharding. 
  • Automatic RegionServer failover. 
  • Hadoop/HDFS Integration with MapReduce.
  • Native Java Client API (HTable). 
  • Support for Filters, Operational Management. 
  • Multiple clients like its native Java library. Thrift, and REST. 
  • Few third party clients are (each of them has it's own advantages) :
    • Apache Phoenix (JDBC layer for HBase)
    • Stumbleupon Asynchbase (asynchronous, non-blocking)
    • Kundera(JPA 1.0 ORM library)

Hive | High Level MapReduce

Apache Hive is a data warehouse solution built on the top of HDFS to facilitate querying and managing large datasets preset in HDFS and compatible file systems such as Amazon S3 filesystem.

It is a query engine wrapper built on top of Map Reduce, it is considered as default data warehousing tool of Hadoop Ecosystem. It provides HiveQL, which is very similar to SQL. Hive hides the Hadoop complexity is from end users.

HiveQL is a SQL like language which supports JDBC drivers and interactive SQL queries for large volumes of data in HDFS.

Hive provides external interfaces like command line (CLI) and web UI, and application programming interfaces (API) like JDBC and ODBC.

It was originally developed by Facebook to manager their large volume datasets and later contributed to Apache. When we think of Hadoop data warehousing, Hive becomes an important contituent of Hadoop Ecosystem.

HiveQL prvides support for adhoc queries, schema on read and transparently converting queries to map/reduce (as per underlying job infrastructure e.g. Hadoop MapRed, Spark or Tez). Hive compiles queries into mapreduce jobs and run them into Hadoop cluster.

Hive is considered as a standard for interactive SQL queries over petabytes of data in Hadoop. It easily integrates with other data center technologies via standard JDBC interface. It needs a metastore which stores the metadata for Hive tables and partitions in a relational database, and provides clients access to this information via the metastore service API.

Note that Hive does NOT provide low latency or real-time queries, in Hive even small queries may take minutes. It is basically designed for scalability and ease-of-use rather than low latency responses.

Key Features

  • HiveQL based SQL like interactive queries with JDBC support.
  • HiveQL supports FILTERS, JOINS, ORDER BY, GROUP BY clauses out of the box.
  • HiveQL allows traditional map/reduce programmers to plug in custom mappers and reducers when it is difficult to write respective queries/logic in HiveQL.
  • Different storage types support such as plain text, RCFile, HBase, ORC, and others.
  • Metastore or Metadata storage in an RDBMS, significantly reducing the time to perform semantic checks during query execution.
  • Transparently converts queries to map/reduce jobs and run them into Hadoop cluster.
  • Supports schema on read.
  • Supports user defined functions (UDF - written in Java and referenced by a HiveQL query) to handle use-cases not supported by built-in functions.
  • Supports complex data types such as STRUCT, MAP, ARRAY.
  • Supports indexing to provide acceleration and fast retrievals, index type including compaction and Bitmap index 

Flume | Streaming Events

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amount of streaming event data into the Hadoop Distributed File System - HDFS (or may be to some other centralized data store). It is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.
Flume uses Sources and Sinks abstractions and a pipeline data flow to link them together.

Flume - A High Level View

Why Flume

In big data deployment, data analysis is the second part of the game rather is applicable only if we are able to collect data first; getting the data into a Hadoop cluster is the first step for any big data platform.
Considering the slow nature of HDFS it is easier to load slowly moving big chunks of data to HDFS directly but when it comes to handling fast moving data with much higher velocities, HDFS alone is not the right choice rather we can say not self sufficient.
Here comes the need of a middleman in between who can handle fast incoming streams and under the hood keeps on flushing to HDFS.

When we talk about loading data to HDFS we should keep in mind few things:
  • HDFS is not a real filesystem in the traditional sense, in HDFS the file exists only as a directory entry, it shows as having zero length until the file is closed. This means if data is written to a file for an extended period without closing it, a network disconnect with the client will leave you with nothing but an empty file for all your efforts. This may lead you to the conclusion that it would be wise to write small files so you can close them as soon as possible.
  • Another problem is Hadoop doesn't like lots of tiny files as the HDFS metadata is kept in memory on the NameNode, the more files you create, the more primary memory/RAM we will use. From a MapReduce prospective also writing tiny files lead to poor efficiency.
  • We can load data to Hadoop by using HDFS APIs, commands. Loading data via HDFS APIs or commands work data which is ready to upload and we have all the data inhand. Since Hadoop is designed to handle batch operations, dealing with real time streaming data is bit problematic. Suppose we have a system which is generating data at massive rate. What should be the batch frequency of loading data to HDFS? It might vary as per the need but certainly we would not have the right solution in place using batch apporach.
As we see due to internal design/architecture HDFS is not suitable for handling high-throughput streams directly. Apache Flume makes data loading process easy, efficient and manageable.

Key Features

  • Streams data from multiple sources into Hadoop for analysis or as dataware house.
  • Collects high-velocity, high-volume logs in real time.
  • Handles the zig-zag flows (dynamic spikes) where the rate of incoming data sometime exceeds the rate at which data can be written to the destination
  • Provides guarantee data delivery.
  • Scales horizontally to handle additional data volume as each agent runs independently.
  • Supports dynamic reconfiguration without the need for a restart, which allows for reduction in the downtime for flume agents.
  • Deployed as fully distributed with no central coordination point. Each agent runs independent of others with no inherent single point of failure.

Key Constituents

  • Event
    • An Event represents the fundamental unit of data transported by Flume from a source to its the destination.
    • Event is a byte array payload having optional string headers.
    • Payload is just like a binary to Flume, it cannot see or manipulate payloads.
    • Headers are specified as an unordered collection of string key-value pairs, with keys being unique across the collection.
    • Headers can be used for event routing.
  • Client
    • An entity that generates events and sends them to one or more Agents. Ideal clients runs as a part of application process.
    • Flume client supports RPC based and HTTP based interfaces
    • Flume also supports custom clients developed using client SDK (org.apache flume.api)
    • It decouples Flume from the system where event data is generated.
  • Agent
    • A container/server for hosting sources, channels, sinks and other components that enable the transportation of events from one place to another. Runs as an independent JVM
    • It is basically a daemon running for setting up Flume data ingestion flows and components.
    • It provides Configuration, life-cycle Management, monitoring support for hosted components.
    • Agents use transactional exchange to guarantee delivery across hops.
  • Source
    • It writes events to one or more channels.
    • It receives events from a specialized location or mechanism and places it on one or more channels.
    • Main source types:
      • Avro source: Listens on Avro port and receives events from external Avro client streams.
      • Thrift source: Listens on Thrift port a n d receives events from external Thrift client streams.
      • Exec source: Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out. If the process exits due to any reason, the source also exits and will produce no further data.
      • JMS source: JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application as per Flume documentation it should work with any JMS compliant provider but tested with Active MQ only as of now. The JMS source provides configurable batch size, message selector, user/pass, and message to Flume event converter.
      • Syslog source: Reads syslog data and generate Flume events. The UDP source treats an entire message as a single event whereas TCP sources create a new event for each string of characters separated by a newline.
      • HTTP source: A source which accepts Flume Events by HTTP POST and GET. HTTP requests are converted into flume events by a pluggable handler. This handler must implement HTTPSourceHandler interface, handler takes a HttpServletRequest and returns a list of flume events
    • Source requires at least one channel to function
  • Channel
    • A channel is the holding area as events are passed from a source to a sink.
    • A passive component that buffers the incoming events until they are drained by Sinks.
    • Different channels offer different levels of durability (Memory, File, Database)
    • Channels are fully transactional.
    • Provides weak ordering guarantees
    • Can work with any number of Sources and Sinks
    • Durable channels ensure that data is never lost in transit.
    • Main channel types:
      • In-Memory channel: Events are stored in an in-memory queue with configurable max size. Generally it is used for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures.
      • JDBC-based durable channel: The events are stored in a persistent storage that's backed by a database.
      • File-based durable channel: The events are persisted in a file for durability: It also supports encryption i.e. file data can be encrypted for security purposes.
      • Spillable Memory channel: The events are stored in an in-memory queue and on disk. The in-memory queue serves as the primary store. The disk store is managed using an embedded File channel. When the in-memory queue is full, additional incoming events are stored in the file channel.
      • Custom channel: A custom channel is your own implementation of the Channel interface. A custom channel's class and its dependencies must be included in the agent's classpath when starting the Flume agent. The type of the custom channel is its FQCN (class name with package name)
  • Sink
    • An active component that removes events from a Channel and transmits them to their next hop destination.
    • Different types of sinks:
      • HDFS sink: This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events.
      • Avro sink: Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair.
      • Thrift sink: Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair.
      • IRC sink: The IRC sink takes messages from attached channel and relays those to configured IRC destinations.
      • LFS or File Roll sink: Stores events on the local filesystem.
      • Null sink: It discards all events it receives from the channel.
      • HBase sink: Sends events to HBase server, Hbase configuration is picked up from the first hbase-site.xml encountered the classpath.
      • Morphline Solr sink: This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications.
      • ElasticSearch sink: This sink writes data to an elasticsearch cluster.
    • Requires exactly one channel to function
  • Interceptor
    • Interceptors are applied to a source in predetermined order to enable decorating and filtering of events where necessary.
    • Built-in interceptors allow adding headers such as timestamps, hostname, static markers.
    • Custom interceptors can inspect event payload to create specific headers where necessary
    • Default interceptors provided with Flume:
      • Timestamp interceptor: Inserts the time in millis at which it processes the eventiii This interceptor inserts a header with key timestamp whose value is the relevant timestamp.
      • Host interceptor: Inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.
      • Static interceptor: Static interceptor allows user to append a static header with static value to all events.
      • UUID interceptor: This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43cl-8fad b7a586fclb97, which represents a 128-bit value.
      • Regex Filtering interceptor: This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression. The supplied regular expression can be used to include events or exclude events.
      • Regex Extractor interceptor: This interceptor extracts regex match groups using a specified regular expression and appends the match groups as headers on the event. It also supports pluggable serializers for formatting the match groups before adding them as event headers.
  • Channel Selector
    • Channel selector used to select one or more channels from all the configured channels, based on preset criteria.
    • Built-in Channel Selectors:
      • Replicating: for duplicating the events.
      • Multiplexing: for routing based on headers.
    • Custom Channel Selectors can use dynamic criteria where needed.
  • Sink Processor
    • Sink Processor is responsible for invoking one sink from a specified group of Sinks.
    • Built-in Sink Processors:
      • Default sink processor: Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sink.
      • Failover sink processor: Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).
      • Load balancing sink processor: Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to ROUND_ROBIN type, but can be overridden via configuration. Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.
Note that Flume uses channel-based transactions to guarantee reliable message delivery. When a message agent to another, two transactions are started, one on the agent that delivers the event and the other receives the event. In order for the sending agent to commit it's transaction, it must receive success receiving agent. The receiving agent only returns a success indication if it's own transaction commits This ensures guaranteed delivery semantics between the hops that the flow makes.

Event Flows

Typical Flume Event Flow
  • Sending Data
    • Clients send events to agents.
    • Source operating within the agent receive the events
    • Source passes received events through interceptors and puts them on channel identified the configured channel selector. Note that event population in Channel is transactional.
  • Data Draining (Internal processing for next step)
    • Sink Processor identifies one of the configured sinks and invokes it.
    • Invoked sink takes events from its configured channel and sends it to the next hop destination, note that event removal from channel is transactional.
    • In case the transmission of event fails, the sink processor can take secondary action.
  • Data Transmission
    • Agents transmit events to next hop destinations.
    • In case of transmission failure the second agent retains the data in its channels
We will see how to setup Flume agent and send events to HDFS via Avro source in the next post

Cassandra | Scalability, HA, Performance

Apache Cassandra is an open source distributed database system that is designed for storing and managing large amounts of data across commodity servers. It can be used as both a real-time operational data store for online transactional applications and a read-intensive database for large-scale business intelligence systems. It is a high performance, extremely scalable, fault tolerant with no single point of failure, distributed database solution.

According to Wikipedia "The name Cassandra was inspired by the beautiful mystic seer in Greek mythology whose predictions for the future were never believed."

It was developed at Facebook to power their Inbox Search feature. It was released as an open source project on Google code in July 2008. Today Cassandra is being used by Apple, GitHub, GoDaddy, NetFlix, Instagram, eBay, Twitter, Reddit and many others key organisations, it is one of today's most popular NoSQL databases in use.

According to cassandra.apache.org, the largest known Cassandra setup (at Apple) involves over 10 PB of data on over 75,000 nodes.

Cassandra is a very interesting product with a wide range of use cases. With key advantage of supporting large volumes of data, at the same time it also supports semi structured data models and where significant changes in the model are expected with time. It's particularly well suited database option for the following type of use cases:

  • Very large data volumes
  • Very large user *transaction volumes
  • High reliability requirements for data storage
  • Dynamic data model

Big Data | Volume, Velocity, Variety

- Need of Big Data

Need of Big Data

Big data is a term for any larger collection of data sets that they are so large and complex that it becomes difficult to process using traditional tools, RDBMS or traditional data processing applications.
The sizes varies and constantly moving target, as of 2012 ranging from a few dozen terabytes to many petabytes of data in a single data set (Wikipedia).

3Vs are three defining properties or dimensions of big data:
    - volume (amount of data)
    - velocity (speed of data in and out)
    - variety (range of data types and sources)


Thursday, July 10, 2014

Spring Integration | Integration Patterns

Enterpise Integration Strategies

In present era no system in the world can be self sufficient to meet all business needs of its own. It needs to interact to number of interconnected systems to achieve business goals . For this interaction it needs to share data/information among those system, mainly we can divide this Interaction or integration strategies in the following categories :
  • Shared File Systems
    Two or more applications share a common file system; one may write to it while the other may poll the file system to read it. The sender and receiver are decoupled in this case. This solution certainly works, but has drawbacks like performance, reliability, and dependency on the File system. 
  • Shared Database
    In this strategy, applications share the data from a single database. One application writes data to the table while the other simply reads from table. The drawback is that this setup forces applications to use a unified schema throughout the organization. Shared databases also pose a few other issues, such as network lag and lock contention. 
  • Remote Procedure Calls
    If data or a process needs to be shared across an organization, one way to expose this functionality is through a remote service, for example an EJB or a SOAP/REST service allows functionality to be exposed to the rest of the enterprise. Using a service, the implementation is encapsulated, allowing the different applications to change the underlying implementation without affecting any integration solution as long as the service interface has not changed. The thing to remember about a service is that the integration is synchronous: both the client and the service must be available for integration to occur, and they must know about each other.  
  • Messaging
    This strategy mainly encourages and supports sender—receiver decoupling.,A sender application sends a piece of data enclosed in a message to a messaging middleman and forgets about it. A consumer consumes the message whenever it can and begins its own workflow. One of the advantages of using Messaging as the medium is that the sender and receiver are decoupled completely. These styles are disparate because no one solution will work all the time. To build a generic solution, middleware thinkers thought of a system based on these I patterns, typically called an Enterprise Service Bus (ESB). An ESB is the ultimate middle-man; it knows how to talk all languages, over all protocols, and mediate messages being passed. JEE and Spring played a key role in simplifying the enterprise programming model. JEE standardized and made commodities of solutions to common enterprise problems like database access, remote procedure invocation, transactions, authentication, directory services, etc. EAI solutions have no direct support in JEE, aside from the basics: RFC, messaging. There are many patterns for Enterprise Application Integration, and just as many protocols that need to be dealt with.

SEDA (Staged Event-Driven Architecture) - An Integration Pattern

Staged event-driven architecture (SEDA) is an approach to build a system that can support massive concurrency without incurring many of the issues involved with using a traditional thread and event based approach. The basic premise is to break the application logic into a series of stages connected by event queues. Each stage may be conditioned to handle increased load by increasing the number of threads for that stage (increasing concurrency).

Kafka | A Messaging Rethought

As per ASF "Apache Kafka is publish-subscribe messaging rethought as a distributed commit log". It is an open-source message broker written in Scala. It provides a unified, high-throughput, low-latency platform for handling real-time data feeds. The design is heavily influenced by transactions logs hence defined as "messaging rethought as a distributed commit log".

Kafka was originally developed by Linkedln, open sourced in early 2011 and later was contributed to ASF. Kafka is considered as The Next Generation Distributed Messaging System.

Kafka - High Level View


Hadoop | HDFS

Apache Hadoop is an open-source software framework for storage and large-scale distributed computing on the top of commodity hardware. It basically provides us two things:
  • A distributed filesystem called HDFS (Hadoop Distributed File System)
  • A framework and API for building and running MapReduce jobs
HDFS is a file system similarly to a regular Unix filesystem with distributed data storage across several machines. It is not intended as a replacement to a regular filesystem, but rather as a filesystem running on the top of regular low level file system (it is like a layer for large distributed systems to use). It has in built mechanisms to handle machine outages, and is optimized for throughput rather than latency.

Key Features

  • HDFS stores files in blocks typically at least 64 MB in size, much larger than the 4-32 KB seen in most filesystems.
  • HDFS is optimized for throughput over latency; it is very efficient at streaming read requests for large files but poor at seek requests for many small ones.
  • HDFS is optimized for workloads that are generally of the write-once and read-many type.
  • Each storage node runs a process called a DataNode that manages the blocks on that host, and these are coordinated by a master NameNode process running on a separate host.
  • Instead of handling disk failures by having physical redundancies in disk arrays or similar strategies, HDFS uses replication (NameNode takes care of replication, failover)
  • NameNode constantly monitors reports sent by each DataNode to ensure that failures have not dropped any block below the desired replication factor. If this does happen, it schedules the addition of another copy within the cluster.