Saturday, September 13, 2014

Kafka | Setup

We learnt about Kafka in previous post. We will setup and run three node Kafka cluster (fully distributed) here.

Installation

For installation we will use three CentOS VMs which we configured in earlier post. We will setup three node Kafka cluster.

Note that here we are using following details for installation (for complete Kafka setup):

     - Installation base directory:  
      • /home/anishsneh/installs
     - Installation user name:
      • anishsneh
     - Hostnames: 
      • server01 (broker 1)
      • server02 (broker 2)
      • server03 (broker 3)
Steps to install Kafka:
  1. Install Kafka We will use Kafka 0.8.1.1 (kafka_2.10-0.8.1.1.tgz) built using Scala 2.10
    • Download kafka_2.10-0.8.1.1.tgz from Apache Kafka webpage, note that we are using Kafka 0.8.1.1 which is compiled using Scala 2.10
    • Extract downloaded package to /home/anishsneh/installs, such that we have:
      [anishsneh@server01 installs]$ ls -ltr kafka_2.10-0.8.1.1
      total 28
      -rw-rw-r--. 1 anishsneh anishsneh   162 Apr 22 11:37 NOTICE
      -rw-rw-r--. 1 anishsneh anishsneh 11358 Apr 22 11:37 LICENSE
      drwxr-xr-x. 2 anishsneh anishsneh  4096 Apr 22 12:26 libs
      drwxr-xr-x. 2 anishsneh anishsneh  4096 Apr 22 12:26 config
      drwxr-xr-x. 3 anishsneh anishsneh  4096 Apr 22 12:26 bin
      
    • Repeat above steps for all the three hosts.
  2. Configure Cluster
    • Set KAFKA_HOME=/home/anishsneh/installs/kafka_2.10-0.8.1.1 in ~/.bashrc (or wherever maintaining environment variables), reload profile/bash
    • Create /home/anishsneh/installs/data/kafka/zookeeper directory for Kafka's ZooKeeper data and /home/anishsneh/installs/tmp/kafka-logs for Kafka logs
    • Edit $KAFKA_HOME/config/server.properties with following:
      #For first broker broker_id=1
      #For second broker broker_id=2
      #For third broker broker_id=3
      broker.id=1 
      host.name=server01
      log.dirs=/home/anishsneh/installs/tmp/kafka-logs
      zookeeper.connect=server01:2181,server02:2181,server03:2181
      
      Repeat for all the three hosts.
    • Edit $KAFKA_HOME/config/zookeeper.properties with following:
      dataDir=/home/anishsneh/installs/data/kafka/zookeeper
      server.1=server01:2888:3888
      server.2=server02:2888:3888
      server.3=server03:2888:3888
      
      Repeat for all the three hosts.
    • Create file /home/anishsneh/installs/data/kafka/zookeeper/myid (i.e. in the data directory for each node of the Kafka cluster)
      touch /home/anishsneh/installs/data/kafka/zookeeper/myid
      #Use BROKER_ID=1 for first broker
      #Use BROKER_ID=2 for second broker 
      #Use BROKER_ID=3 for third broker  
      echo "$BROKER_ID">/home/anishsneh/installs/data/kafka/zookeeper/myid
      
  3. Start Cluster
    • Start ZooKeeper using startup script:
      $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
      Repeat for all the three servers i.e. start on server01, server02, server03 in our case
    • Start Kafka Server using startup script:
      $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
      Repeat for all the three servers i.e. start on server01, server02, server03 in our case
  4. Verify Installation
    • Execute jps command ($JAVA_HOME/bin/jps) on all the three Kafka nodes in cluster, it should show following running processes
      [anishsneh@server01 installs]$ jps
      57690 Jps
      57558 QuorumPeerMain
      57621 Kafka
      
  5. Create First Topic
    • Create topic
      [anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic kafka-demo-topic
      Created topic "kafka-demo-topic".
      
      Using localhost in connection string since we are connecting to local ZooKeeper
    • Verify topic
      [anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka-demo-topic
      Topic:kafka-demo-topic PartitionCount:3 ReplicationFactor:3 Configs:
       Topic: kafka-demo-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
       Topic: kafka-demo-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
       Topic: kafka-demo-topic Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
      
      Using localhost in connection string since we are connecting to local ZooKeeper
  6. Send/Receive Message
    • For starting commandline consumer, open terminal and start default consumer shipped with Kafka bundle using following command:
      [anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafka-demo-topic
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      
    • For starting commandline producer, open terminal and start default producer shipped with Kafka bundle using following command:
      [anishsneh@server02 installs]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list server01:9092 --topic kafka-demo-topic
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      
    • Enter any text in the producer console, it should appear in the consumer terminal window.

Producer/Consumer - Java API

  • Create two maven projects in eclipse with following structure:


  • Edit kafka-consumer pom.xml with following contents:
    
     4.0.0
     com.anishsneh.demo.kafka
     kafka-consumer
     0.0.1-SNAPSHOT
     jar
     kafka-consumer
     http://maven.apache.org
     
      
       org.apache.kafka
       kafka_2.10
       0.8.1
       
        
         com.sun.jmx
         jmxri
        
        
         com.sun.jdmk
         jmxtools
        
        
         javax.jms
         jms
        
       
      
      
       junit
       junit
       3.8.1
       test
      
     
    
    
    
  • Edit kafka-producer pom.xml with following contents:
    
     4.0.0
     com.anishsneh.demo.kafka
     kafka-producer
     0.0.1-SNAPSHOT
     jar
     kafka-producer
     http://maven.apache.org
     
      
       org.apache.kafka
       kafka_2.10
       0.8.1
       
        
         com.sun.jmx
         jmxri
        
        
         com.sun.jdmk
         jmxtools
        
        
         javax.jms
         jms
        
       
      
      
       junit
       junit
       3.8.1
       test
      
     
    
    
    
  • Add KafkaConsumer.java class with following contents:
    package com.anishsneh.demo.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class KafkaConsumer {
    
     public static void main(String[] args) {
    
      Properties props = new Properties();
      props.put("zookeeper.connect", "localhost:2181");
      props.put("group.id", "first_consumer_group");
      props.put("zookeeper.session.timeout.ms", "30000");
      props.put("zookeeper.sync.time.ms", "200");
      props.put("auto.commit.interval.ms", "1000");
    
      ConsumerConfig cf = new ConsumerConfig(props);
    
      ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf);
    
      String topic = "kafka-demo-topic";
    
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topic, new Integer(1));
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
      List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
      KafkaStream<byte[], byte[]> stream = streams.get(0);
    
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      int i = 1;
      while (it.hasNext()) {
       System.out.println("INDEX: " + i + ": MESSAGE: " + new String(it.next().message()));
       ++i;
      }
      consumer.shutdown();
     }
    }
    
    
  • Add KafkaProducer.java class with following contents:
    package com.anishsneh.demo.kafka;
    
    import java.util.Properties;
    import java.util.UUID;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class KafkaProducer {
    
     public static void main(String[] args) throws InterruptedException {
    
      Properties props = new Properties();
    
      props.put("metadata.broker.list", "server01:9092,server02:9092");
      props.put("serializer.class", "kafka.serializer.StringEncoder");
      props.put("request.required.acks", "1");
    
      ProducerConfig config = new ProducerConfig(props);
    
      Producer<String, String> producer = new Producer<String, String>(config);
    
      String topic = "kafka-demo-topic";
    
      for (int i = 1; i <= 1000; i++) {
       String msg = "UUID: " + UUID.randomUUID().toString() + "; AT: " + System.currentTimeMillis();
       System.out.println(msg);
       KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, String.valueOf(i), msg);
       producer.send(data);
       Thread.sleep(2000L);
      }
      producer.close();
     }
    }
    
    
  • Create single executable jars for each producer and consumer project respectively (we may use eclipse export with extracted dependencies or maven assembly or maven shade plugin etc.). Say we have producer jar kafka-producer.jar and consumer jar kafka-consumer.jar
  • Start consumer using:
    [anishsneh@server01 tmp]$ java -jar <PATH_TO_JAR>kafka-consumer.jar
    
  • Start producer using:
    [anishsneh@server01 tmp]$ java -jar <PATH_TO_JAR>kafka-producer.jar
    
  • After few seconds we will see in producer console that messages are being sent:
    [anishsneh@server01 tmp]$ java -jar kafka-producer.jar 
    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
    log4j:WARN Please initialize the log4j system properly.
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    UUID: 87b94fe3-523b-4e02-89a8-9bcf1ad39332; AT: 1410570722367
    UUID: cbd637c4-e925-4d67-b694-dc4d6bc128bf; AT: 1410570724581
    UUID: db3b2330-9074-432b-884f-4a6b874f6717; AT: 1410570726606
    UUID: 13cbdb3e-2589-4673-8b3e-6d534c43874f; AT: 1410570728631
    UUID: 2028b3a5-f399-45e0-b681-e35900ef5c2e; AT: 1410570730644
    
    
    
    and in consumer console that messages are being received:
    [anishsneh@server01 tmp]$ java -jar kafka-consumer.jar 
    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
    log4j:WARN Please initialize the log4j system properly.
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    INDEX: 1: MESSAGE: UUID: 87b94fe3-523b-4e02-89a8-9bcf1ad39332; AT: 1410570722367
    INDEX: 2: MESSAGE: UUID: cbd637c4-e925-4d67-b694-dc4d6bc128bf; AT: 1410570724581
    INDEX: 3: MESSAGE: UUID: db3b2330-9074-432b-884f-4a6b874f6717; AT: 1410570726606
    INDEX: 4: MESSAGE: UUID: 13cbdb3e-2589-4673-8b3e-6d534c43874f; AT: 1410570728631
    INDEX: 5: MESSAGE: UUID: 2028b3a5-f399-45e0-b681-e35900ef5c2e; AT: 1410570730644