Monday, October 12, 2015

ZooKeeper | A Reliable, Scalable Distributed Coordination

In previous posts we learnt about various big data projects/systems, all of these systems are distributed and clustered in nature. For distribution and cluster management, all of them needs one or another low level API. ZooKeeper can be seen as one of those low level APIs which can be used to build a distributed co-ordination system.

ZooKeeper is a highly reliable, scalable, distributed coordination system. As per ZooKeeper wiki 
"ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical name space of data registers".
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization and group services. It provides a very simple interface to a centralized coordination service. The service itself is distributed and highly reliable.

Distributed applications use Zookeeper to store and mediate updates to important configuration information. Many top level big data projects like Hadoop, Kafka, HBase, Accumulo, Solr uses ZooKeeper as a distributed co-ordination system. Extensive list of projects powered by ZooKeeper can be found here.

As ZooKeeper wiki says it coordinates using a shared hierarchical data registers, in ZooKeeper terms these registers are known as ZNODEs.

ZooKeeper comes with bunch of "out of the box" benifits like:
  • Fast
    • ZooKeeper is fast with workloads where reads to the data are more than writes. The ideal read/write ratio is about 10:1.
  • Reliable
    • ZooKeeper is replicated over a set of servers known as ensemble. All the servers are visible to each other. The ZK service is available hence there is no single point of failure.
  • Simple
    • ZooKeeper follows a simple data model and maintains a standard hierarchical name space, similar to files and directories on a file system.
ZooKeeper Ensemble

Sunday, October 11, 2015

Spark | Lightning Fast Cluster Computing

Apache Spark is an open source cluster computing platform/framework which brings fast, in-memory data processing to Hadoop. Spark's expressive development APIs allow data workers to efficiently execute streaming, machine learning or SQL workloads that require fast iterative access to datasets.

It extends well known MapReduce model to further efficiently support various types of computations, including interactive queries and stream processing. Speed is the key in processing large datasets.

If we have large amounts of data that requires low latency processing that a typical MapReduce system cannot provide, Spark is the right choice, it performs at speeds up to 100 times faster than Map Reduce for iterative algorithms or interactive data mining as it provides in-memory cluster computing for lightning fast speed.

Apache Spark consists of Spark Core and a set of libraries. The core is the distributed execution engine and the Java, Scala, and Python APIs offer a platform for distributed ETL application development.

Spark was originally developed in the AMPLab at University of California, Berkeley and later donated to Apache Foundation.

Note that Generally Spark is used on the top of HDFS. At a high level we can say we may use Spark Core in conjuction with HDFS.

Spark combines SQL, streaming and complex analytics together in the same application to handle multiple data processing scenarios. It can access wide range of data sources such as HDFS, Cassandra, HBase or S3.

Extensive list of users and the projects powered by Spark can be found here.

At a high level Spark addresses following use cases:

  • Streaming Data
    • Apache Spark's key use case is its ability to process streaming data. With so much data being processed on a daily basis, it has become essential for organizations to be able to stream and analyze it all in real time.
  • Machine Learning
    • Spark has useful implementation of machine learning capabilities including wide variety of machine learning algorithms like classification, recommendation, clustering, pattern-mining and so on.
  • Interactive Analysis
    • Initially Hadoop MapReduce was developed to handle batch processing and SQL-on-Hadoop engines such as Hive or Pig are extremely slow for interactive analysis, where as Spark provides very fast queries to support interactive analysis using its in-memory capabilities. In other words we can say Spark is a batch analytics system that can pretends as an interactive analytics system because of operating on in-memory RDD's and the caching hence possible.
Spark | Use case reference
Spark | Use case reference

Monday, September 28, 2015

Hive | Input & Output Formats

In previous post we learnt about setting up and runnning Hive on our distributed Hadoop cluster. In this post we will learn about various Hive input and output formats.

Key Formats

  • AVRO
We will use same Hadoop cluster and Hive setup done in previous post.

Usage | Hands On

    • Separated readable text file e.g. text file with tab or comma separated fields. This is the default format for Hive (depending on hive.default.fileformat configuration).
    • Syntax:  STORED AS TEXTFILE 
    • Usually human readable text.
      hive> USE user_db;
      Time taken: 0.044 seconds
      hive> CREATE TABLE IF NOT EXISTS users_txt (uid String, login String, full_name String, email String, country String) COMMENT 'User details' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
      Time taken: 0.384 seconds
      hive> LOAD DATA LOCAL INPATH '/tmp/users.csv' OVERWRITE INTO TABLE user_db.users_txt;
      Loading data to table user_db.users_txt
      Table user_db.users_txt stats: [numFiles=1, numRows=0, totalSize=7860, rawDataSize=0]
      Time taken: 1.138 seconds
      hive> SELECT * FROM users_txt LIMIT 10;
      755777ae-3d5f-415e-ac33-5d24db748e09 rjones0 Randy RU
      a4dae376-970e-4548-908e-cbe6bff88550 mmitchell1 Martin FI
      f4781787-c731-4db6-add2-13ab91de22a0 pharvey2 Peter FR
      d35df636-a7c8-4c50-aa57-e99db4cbdb1a gjames3 Gary LT
      d26c04a3-ca28-4d2e-84cf-0104ad2acb92 rburton4 Russell YE
      6a487cfb-5177-4cc2-bdbd-4bc4751b9592 pharris5 Patrick NO
      3671d7f7-2a75-41dc-be84-609106e5bdfa kcrawford6 Keith PT
      beae01c4-3ee6-4c59-b0d6-60c5811367f2 jedwards7 Juan PH
      899dc8a4-5a8f-44cf-ac23-ae8c3729836c slynch8 Samuel VN
      f274e93d-378c-4377-a9c7-7c235a36b72a mgray9 Martin IE
      Time taken: 0.696 seconds, Fetched: 10 row(s)

Monday, May 18, 2015

Cassandra | Setup

We learnt about Cassandra in previous post. We will setup and run client on an Cassandra cluster(fully distributed) here.


For installation we will use three nodes. We will install fully distributed Cassandra cluster. Here we are using following details for installation (for complete setup):

  • Installation base directory:
      • /home/anishsneh/installs
  • Installation user name:
      • anishsneh
  • Hostnames: 
      • server01 (first node, say with ip address
      • server02 (second node, say with ip address
      • server03 (third node, say with ip address
Note that in Cassandra there is NO SINGLE POINT OF FAILURE, hence all the nodes are equal and there is no MASTER or SLAVE.

  • Install Cassandra
    • Download Apache Cassandra binary from Apache Website.
    • Extract downloaded package to /home/anishsneh/installs, such that we have:
      [anishsneh@server01 installs]$ ls -ltr apache-cassandra-2.1.5/
      total 360
      -rw-r--r--. 1 anishsneh anishsneh   2117 Apr 27 07:33 NOTICE.txt
      -rw-r--r--. 1 anishsneh anishsneh  64431 Apr 27 07:33 NEWS.txt
      -rw-r--r--. 1 anishsneh anishsneh  11609 Apr 27 07:33 LICENSE.txt
      -rw-r--r--. 1 anishsneh anishsneh 245971 Apr 27 07:33 CHANGES.txt
      drwxr-xr-x. 2 anishsneh anishsneh   4096 May 17 15:37 interface
      drwxr-xr-x. 4 anishsneh anishsneh   4096 May 17 15:37 javadoc
      drwxr-xr-x. 3 anishsneh anishsneh   4096 May 17 15:37 lib
      drwxr-xr-x. 3 anishsneh anishsneh   4096 May 17 15:37 pylib
      drwxr-xr-x. 4 anishsneh anishsneh   4096 May 17 15:37 tools
      drwxr-xr-x. 2 anishsneh anishsneh   4096 May 17 15:37 bin
      drwxrwxr-x. 2 anishsneh anishsneh   4096 May 17 15:51 logs
      drwxrwxr-x. 5 anishsneh anishsneh   4096 May 17 15:51 data
      drwxr-xr-x. 3 anishsneh anishsneh   4096 May 17 16:46 conf
    • Repeat above steps for all the three nodes.

Thursday, March 26, 2015

Cassandra | Quick Dive

In the previous post we learnt about the basics of Cassandra and CAP theorem, in this post we will have a closer look at Cassandra data model and working of Cassandra.

Data Model

Cassandra is can be defined as a hybrid between a key-value and a column-oriented database. In Cassandra world the a data model can be seen as a map which is distributed across the cluster. In other words a table in Cassandra is a distributed multi-dimensional map indexed by a key.

Cassandra Data Model

Tuesday, March 24, 2015

Cassandra | Internals

In the previous post we learnt about Cassandra data model and replication concepts, in this post we will look the Cassandra architecture and read/write internals.

Architecture | Highlights

  • Cassandra was designed after considering all the system/hardware failures that do occur in real world.
  • Peer-to-peer, distributed system in which all nodes are alike hence reults in read/write anywhere design.
  • Data is transparently partitioned among all nodes in the cluster.
  • Custom data replication is provided out of the box to ensure fault tolerance.
  • In Cassandra cluster each node communicates with other through the GOSSIP protocol, which exchanges information across the cluster every second.
  • A commit log is used on each node to capture write activity. Data durability is assured.
  • At the same time data also written to an in-memory structure (memtable) and then to disk once the memory structure is full (an SStable).
  • A row in a column family is indexed by its key. Other columns may be indexed as well, we need indexes to quickly search from cassandra. Note that in Cassandra indexes are virtually another tables.
  • Consistency can be choosen between strong and eventual (from all to any node responding) depending on the need. It can be done on a per-request basis, and for both reads and writes.
  • Provides data compression out of the box. It uses Google's Snappy data compression algorithm, compresses data on a per column family level. There are not known performance penalty in compression.