欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka 从入门到精通2 、 创建kafka 生产者与消费者实例

程序员文章站 2024-02-27 23:51:03
...

上一篇 :kafka 单机版和分布式版安装

首先创建一个生产者:

package org.training.hadoop.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class KafkaProducerExample
{
    public void produceMessage()
    {
        Properties props = getConfig();
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 1000; i++) {
            System.out.println("i:" + i);
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        producer.close();
    }

    // config
    public Properties getConfig()
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public static void main(String[] args)
    {
        KafkaProducerExample example = new KafkaProducerExample();
        example.produceMessage();
    }
}

创建消费者:

package org.training.hadoop.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class KafkaConsumerExample
{
    //config
    public static Properties getConfig()
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "testGroup");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    public void consumeMessage()
    {
        // launch 3 threads to consume
        int numConsumers = 3;
        final String topic = "test";
        final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
        final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
        for (int i = 0; i < numConsumers; i++) {
            KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
            consumers.add(consumer);
            executor.submit(consumer);
        }

        Runtime.getRuntime().addShutdownHook(new Thread()
        {
            @Override
            public void run()
            {
                for (KafkaConsumerRunner consumer : consumers) {
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // Thread to consume kafka data
    public static class KafkaConsumerRunner
            implements Runnable
    {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<String, String> consumer;
        private final String topic;

        public KafkaConsumerRunner(String topic)
        {
            Properties props = getConfig();
            consumer = new KafkaConsumer<String, String>(props);
            this.topic = topic;
        }

        public void handleRecord(ConsumerRecord record)
        {
            System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
        }

        public void run()
        {
            try {
                // subscribe
                consumer.subscribe(Arrays.asList(topic));
                while (!closed.get()) {
                    //read data
                    ConsumerRecords<String, String> records = consumer.poll(10000);
                    // Handle new records
                    for (ConsumerRecord<String, String> record : records) {
                        handleRecord(record);
                    }
                }
            }
            catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) {
                    throw e;
                }
            }
            finally {
                consumer.close();
            }
        }

        // Shutdown hook which can be called from a separate thread
        public void shutdown()
        {
            closed.set(true);
            consumer.wakeup();
        }
    }

    public static void main(String[] args)
    {
        KafkaConsumerExample example = new KafkaConsumerExample();
        example.consumeMessage();
    }
}

创建好之后,运行生产者查看控制台:

可以看到源源不断的生产数据和消费数据:

/home/hadoopuser/jdk1.8.0_111/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:55147,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath /home/hadoopuser/jdk1.8.0_111/jre/lib/charsets.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/deploy.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/cldrdata.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/dnsns.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/jaccess.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/jfxrt.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/localedata.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/nashorn.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/sunec.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/sunjce_provider.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/sunpkcs11.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/ext/zipfs.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/javaws.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/jce.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/jfr.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/jfxswt.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/jsse.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/management-agent.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/plugin.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/resources.jar:/home/hadoopuser/jdk1.8.0_111/jre/lib/rt.jar:/home/hadoopuser/bigdata_projects/AuraHadoopTraining-master/target/classes:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-common/2.7.3/hadoop-common-2.7.3.jar:/home/hadoopuser/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/home/hadoopuser/.m2/repository/org/apache/commons/commons-math3/3.1.1/commons-math3-3.1.1.jar:/home/hadoopuser/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/home/hadoopuser/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/home/hadoopuser/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/hadoopuser/.m2/repository/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar:/home/hadoopuser/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/hadoopuser/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/home/hadoopuser/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/home/hadoopuser/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/home/hadoopuser/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/home/hadoopuser/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar:/home/hadoopuser/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/home/hadoopuser/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar:/home/hadoopuser/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar:/home/hadoopuser/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.7.3/hadoop-hdfs-2.7.3.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/home/hadoopuser/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/home/hadoopuser/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar:/home/hadoopuser/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.7.3/hadoop-mapreduce-client-app-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.7.3/hadoop-mapreduce-client-common-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.7.3/hadoop-yarn-client-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.7.3/hadoop-yarn-server-common-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.7.3/hadoop-mapreduce-client-shuffle-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.7.3/hadoop-yarn-api-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.7.3/hadoop-mapreduce-client-core-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.7.3/hadoop-yarn-common-2.7.3.jar:/home/hadoopuser/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/home/hadoopuser/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/home/hadoopuser/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/home/hadoopuser/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/home/hadoopuser/.m2/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/home/hadoopuser/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar:/home/hadoopuser/.m2/repository/org/codehaus/jackson/jackson-xc/1.9.13/jackson-xc-1.9.13.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.7.3/hadoop-mapreduce-client-jobclient-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-annotations/2.7.3/hadoop-annotations-2.7.3.jar:/home/hadoopuser/.m2/repository/org/apache/flume/flume-ng-sdk/1.6.0/flume-ng-sdk-1.6.0.jar:/home/hadoopuser/.m2/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar:/home/hadoopuser/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/home/hadoopuser/.m2/repository/org/apache/avro/avro-ipc/1.7.4/avro-ipc-1.7.4.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/home/hadoopuser/.m2/repository/org/apache/velocity/velocity/1.7/velocity-1.7.jar:/home/hadoopuser/.m2/repository/io/netty/netty/3.5.12.Final/netty-3.5.12.Final.jar:/home/hadoopuser/.m2/repository/org/apache/thrift/libthrift/0.9.0/libthrift-0.9.0.jar:/home/hadoopuser/.m2/repository/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar:/home/hadoopuser/.m2/repository/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar:/home/hadoopuser/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/home/hadoopuser/.m2/repository/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-client/1.2.4/hbase-client-1.2.4.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-annotations/1.2.4/hbase-annotations-1.2.4.jar:/home/hadoopuser/.m2/repository/com/github/stephenc/findbugs/findbugs-annotations/1.3.9-1/findbugs-annotations-1.3.9-1.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-common/1.2.4/hbase-common-1.2.4.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-protocol/1.2.4/hbase-protocol-1.2.4.jar:/home/hadoopuser/.m2/repository/commons-codec/commons-codec/1.9/commons-codec-1.9.jar:/home/hadoopuser/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/hadoopuser/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/home/hadoopuser/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/home/hadoopuser/.m2/repository/com/google/guava/guava/12.0.1/guava-12.0.1.jar:/home/hadoopuser/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/home/hadoopuser/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar:/home/hadoopuser/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/hadoopuser/.m2/repository/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar:/home/hadoopuser/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/home/hadoopuser/.m2/repository/org/jruby/jcodings/jcodings/1.0.8/jcodings-1.0.8.jar:/home/hadoopuser/.m2/repository/org/jruby/joni/joni/2.1.2/joni-2.1.2.jar:/home/hadoopuser/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-auth/2.5.1/hadoop-auth-2.5.1.jar:/home/hadoopuser/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/home/hadoopuser/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/home/hadoopuser/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/home/hadoopuser/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/home/hadoopuser/.m2/repository/junit/junit/4.12/junit-4.12.jar:/home/hadoopuser/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-jdbc/2.1.1/hive-jdbc-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-common/2.1.1/hive-common-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-storage-api/2.1.1/hive-storage-api-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-orc/2.1.1/hive-orc-2.1.1.jar:/home/hadoopuser/.m2/repository/org/iq80/snappy/snappy/0.2/snappy-0.2.jar:/home/hadoopuser/.m2/repository/org/eclipse/jetty/aggregate/jetty-all/7.6.0.v20120127/jetty-all-7.6.0.v20120127.jar:/home/hadoopuser/.m2/repository/org/apache/geronimo/specs/geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar:/home/hadoopuser/.m2/repository/javax/mail/mail/1.4.1/mail-1.4.1.jar:/home/hadoopuser/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/home/hadoopuser/.m2/repository/org/apache/geronimo/specs/geronimo-jaspic_1.0_spec/1.0/geronimo-jaspic_1.0_spec-1.0.jar:/home/hadoopuser/.m2/repository/org/apache/geronimo/specs/geronimo-annotation_1.0_spec/1.1.1/geronimo-annotation_1.0_spec-1.1.1.jar:/home/hadoopuser/.m2/repository/asm/asm-commons/3.1/asm-commons-3.1.jar:/home/hadoopuser/.m2/repository/asm/asm-tree/3.1/asm-tree-3.1.jar:/home/hadoopuser/.m2/repository/asm/asm/3.1/asm-3.1.jar:/home/hadoopuser/.m2/repository/org/eclipse/jetty/orbit/javax.servlet/3.0.0.v201112011016/javax.servlet-3.0.0.v201112011016.jar:/home/hadoopuser/.m2/repository/joda-time/joda-time/2.5/joda-time-2.5.jar:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-web/2.4.1/log4j-web-2.4.1.jar:/home/hadoopuser/.m2/repository/org/json/json/20090211/json-20090211.jar:/home/hadoopuser/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/home/hadoopuser/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.0/metrics-jvm-3.1.0.jar:/home/hadoopuser/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.0/metrics-json-3.1.0.jar:/home/hadoopuser/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.4.2/jackson-databind-2.4.2.jar:/home/hadoopuser/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.4.0/jackson-annotations-2.4.0.jar:/home/hadoopuser/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.4.2/jackson-core-2.4.2.jar:/home/hadoopuser/.m2/repository/com/github/joshelser/dropwizard-metrics-hadoop-metrics2-reporter/0.1.2/dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-service/2.1.1/hive-service-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-llap-server/2.1.1/hive-llap-server-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-llap-common/2.1.1/hive-llap-common-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/slider/slider-core/0.90.2-incubating/slider-core-0.90.2-incubating.jar:/home/hadoopuser/.m2/repository/com/beust/jcommander/1.30/jcommander-1.30.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-registry/2.7.1/hadoop-yarn-registry-2.7.1.jar:/home/hadoopuser/.m2/repository/com/sun/jersey/jersey-json/1.9/jersey-json-1.9.jar:/home/hadoopuser/.m2/repository/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar:/home/hadoopuser/.m2/repository/com/sun/jersey/jersey-server/1.9/jersey-server-1.9.jar:/home/hadoopuser/.m2/repository/com/google/inject/extensions/guice-servlet/3.0/guice-servlet-3.0.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-hadoop2-compat/1.1.1/hbase-hadoop2-compat-1.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/commons/commons-math/2.2/commons-math-2.2.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-server/1.1.1/hbase-server-1.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-procedure/1.1.1/hbase-procedure-1.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-common/1.1.1/hbase-common-1.1.1-tests.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-prefix-tree/1.1.1/hbase-prefix-tree-1.1.1.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/jetty-sslengine/6.1.26/jetty-sslengine-6.1.26.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/jsp-2.1/6.1.14/jsp-2.1-6.1.14.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/jsp-api-2.1/6.1.14/jsp-api-2.1-6.1.14.jar:/home/hadoopuser/.m2/repository/org/mortbay/jetty/servlet-api-2.5/6.1.14/servlet-api-2.5-6.1.14.jar:/home/hadoopuser/.m2/repository/com/lmax/disruptor/3.3.0/disruptor-3.3.0.jar:/home/hadoopuser/.m2/repository/org/apache/hbase/hbase-hadoop-compat/1.1.1/hbase-hadoop-compat-1.1.1.jar:/home/hadoopuser/.m2/repository/net/sf/jpam/jpam/1.1/jpam-1.1.jar:/home/hadoopuser/.m2/repository/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar:/home/hadoopuser/.m2/repository/javax/servlet/jsp-api/2.0/jsp-api-2.0.jar:/home/hadoopuser/.m2/repository/ant/ant/1.6.5/ant-1.6.5.jar:/home/hadoopuser/.m2/repository/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar:/home/hadoopuser/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/home/hadoopuser/.m2/repository/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar:/home/hadoopuser/.m2/repository/org/jamon/jamon-runtime/2.3.1/jamon-runtime-2.3.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-serde/2.1.1/hive-serde-2.1.1.jar:/home/hadoopuser/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar:/home/hadoopuser/.m2/repository/org/apache/parquet/parquet-hadoop-bundle/1.8.1/parquet-hadoop-bundle-1.8.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-metastore/2.1.1/hive-metastore-2.1.1.jar:/home/hadoopuser/.m2/repository/javolution/javolution/5.5.1/javolution-5.5.1.jar:/home/hadoopuser/.m2/repository/com/jolbox/bonecp/0.8.0.RELEASE/bonecp-0.8.0.RELEASE.jar:/home/hadoopuser/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar:/home/hadoopuser/.m2/repository/org/datanucleus/datanucleus-api-jdo/4.2.1/datanucleus-api-jdo-4.2.1.jar:/home/hadoopuser/.m2/repository/org/datanucleus/datanucleus-rdbms/4.1.7/datanucleus-rdbms-4.1.7.jar:/home/hadoopuser/.m2/repository/commons-pool/commons-pool/1.5.4/commons-pool-1.5.4.jar:/home/hadoopuser/.m2/repository/commons-dbcp/commons-dbcp/1.4/commons-dbcp-1.4.jar:/home/hadoopuser/.m2/repository/javax/jdo/jdo-api/3.0.1/jdo-api-3.0.1.jar:/home/hadoopuser/.m2/repository/javax/transaction/jta/1.1/jta-1.1.jar:/home/hadoopuser/.m2/repository/org/datanucleus/javax.jdo/3.2.0-m3/javax.jdo-3.2.0-m3.jar:/home/hadoopuser/.m2/repository/javax/transaction/transaction-api/1.1/transaction-api-1.1.jar:/home/hadoopuser/.m2/repository/co/cask/tephra/tephra-api/0.6.0/tephra-api-0.6.0.jar:/home/hadoopuser/.m2/repository/co/cask/tephra/tephra-core/0.6.0/tephra-core-0.6.0.jar:/home/hadoopuser/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/home/hadoopuser/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/home/hadoopuser/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/home/hadoopuser/.m2/repository/com/google/inject/extensions/guice-assistedinject/3.0/guice-assistedinject-3.0.jar:/home/hadoopuser/.m2/repository/it/unimi/dsi/fastutil/6.5.6/fastutil-6.5.6.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-common/0.6.0-incubating/twill-common-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-core/0.6.0-incubating/twill-core-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-api/0.6.0-incubating/twill-api-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-discovery-api/0.6.0-incubating/twill-discovery-api-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-discovery-core/0.6.0-incubating/twill-discovery-core-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/org/apache/twill/twill-zookeeper/0.6.0-incubating/twill-zookeeper-0.6.0-incubating.jar:/home/hadoopuser/.m2/repository/co/cask/tephra/tephra-hbase-compat-1.0/0.6.0/tephra-hbase-compat-1.0-0.6.0.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-shims/2.1.1/hive-shims-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/shims/hive-shims-common/2.1.1/hive-shims-common-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/shims/hive-shims-0.23/2.1.1/hive-shims-0.23-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-server-resourcemanager/2.6.1/hadoop-yarn-server-resourcemanager-2.6.1.jar:/home/hadoopuser/.m2/repository/com/sun/jersey/contribs/jersey-guice/1.9/jersey-guice-1.9.jar:/home/hadoopuser/.m2/repository/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-server-applicationhistoryservice/2.6.1/hadoop-yarn-server-applicationhistoryservice-2.6.1.jar:/home/hadoopuser/.m2/repository/org/apache/hadoop/hadoop-yarn-server-web-proxy/2.6.1/hadoop-yarn-server-web-proxy-2.6.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/shims/hive-shims-scheduler/2.1.1/hive-shims-scheduler-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-service-rpc/2.1.1/hive-service-rpc-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/httpcomponents/httpclient/4.4/httpclient-4.4.jar:/home/hadoopuser/.m2/repository/org/apache/httpcomponents/httpcore/4.4/httpcore-4.4.jar:/home/hadoopuser/.m2/repository/org/apache/curator/curator-framework/2.6.0/curator-framework-2.6.0.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-exec/2.1.1/hive-exec-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-ant/2.1.1/hive-ant-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-llap-tez/2.1.1/hive-llap-tez-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/hive/hive-llap-client/2.1.1/hive-llap-client-2.1.1.jar:/home/hadoopuser/.m2/repository/org/apache/commons/commons-lang3/3.1/commons-lang3-3.1.jar:/home/hadoopuser/.m2/repository/commons-httpclient/commons-httpclient/3.0.1/commons-httpclient-3.0.1.jar:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-1.2-api/2.4.1/log4j-1.2-api-2.4.1.jar:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-api/2.4.1/log4j-api-2.4.1.jar:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-core/2.4.1/log4j-core-2.4.1.jar:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar:/home/hadoopuser/.m2/repository/org/antlr/antlr-runtime/3.4/antlr-runtime-3.4.jar:/home/hadoopuser/.m2/repository/org/antlr/stringtemplate/3.2.1/stringtemplate-3.2.1.jar:/home/hadoopuser/.m2/repository/antlr/antlr/2.7.7/antlr-2.7.7.jar:/home/hadoopuser/.m2/repository/org/antlr/ST4/4.0.4/ST4-4.0.4.jar:/home/hadoopuser/.m2/repository/org/apache/ant/ant/1.9.1/ant-1.9.1.jar:/home/hadoopuser/.m2/repository/org/apache/ant/ant-launcher/1.9.1/ant-launcher-1.9.1.jar:/home/hadoopuser/.m2/repository/org/apache/commons/commons-compress/1.9/commons-compress-1.9.jar:/home/hadoopuser/.m2/repository/org/apache/ivy/ivy/2.4.0/ivy-2.4.0.jar:/home/hadoopuser/.m2/repository/org/codehaus/groovy/groovy-all/2.4.4/groovy-all-2.4.4.jar:/home/hadoopuser/.m2/repository/org/datanucleus/datanucleus-core/4.1.6/datanucleus-core-4.1.6.jar:/home/hadoopuser/.m2/repository/org/apache/calcite/calcite-core/1.6.0/calcite-core-1.6.0.jar:/home/hadoopuser/.m2/repository/org/apache/calcite/calcite-linq4j/1.6.0/calcite-linq4j-1.6.0.jar:/home/hadoopuser/.m2/repository/net/hydromatic/eigenbase-properties/1.1.5/eigenbase-properties-1.1.5.jar:/home/hadoopuser/.m2/repository/org/codehaus/janino/janino/2.7.6/janino-2.7.6.jar:/home/hadoopuser/.m2/repository/org/codehaus/janino/commons-compiler/2.7.6/commons-compiler-2.7.6.jar:/home/hadoopuser/.m2/repository/org/pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde/pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar:/home/hadoopuser/.m2/repository/org/apache/calcite/calcite-avatica/1.6.0/calcite-avatica-1.6.0.jar:/home/hadoopuser/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/home/hadoopuser/.m2/repository/stax/stax-api/1.0.1/stax-api-1.0.1.jar:/home/hadoopuser/.m2/repository/jline/jline/2.12/jline-2.12.jar:/usr/local/idea-IU-171.4073.35/lib/idea_rt.jar org.training.hadoop.kafka.KafkaConsumerExample
Connected to the target VM, address: '127.0.0.1:55147', transport: 'socket'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoopuser/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoopuser/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
name: pool-1-thread-1 ; topic: test ; offset19 ; key: 19 ; value: 19
name: pool-1-thread-1 ; topic: test ; offset20 ; key: 20 ; value: 20
name: pool-1-thread-1 ; topic: test ; offset21 ; key: 21 ; value: 21
name: pool-1-thread-1 ; topic: test ; offset22 ; key: 22 ; value: 22
name: pool-1-thread-1 ; topic: test ; offset23 ; key: 23 ; value: 23
name: pool-1-thread-1 ; topic: test ; offset24 ; key: 24 ; value: 24
name: pool-1-thread-1 ; topic: test ; offset25 ; key: 25 ; value: 25
name: pool-1-thread-1 ; topic: test ; offset26 ; key: 26 ; value: 26
name: pool-1-thread-1 ; topic: test ; offset27 ; key: 27 ; value: 27
name: pool-1-thread-1 ; topic: test ; offset28 ; key: 28 ; value: 28
name: pool-1-thread-1 ; topic: test ; offset29 ; key: 29 ; value: 29
name: pool-1-thread-1 ; topic: test ; offset30 ; key: 30 ; value: 30
name: pool-1-thread-1 ; topic: test ; offset31 ; key: 31 ; value: 31