欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

从安装Kafka服务到运行WordCount程序

程序员文章站 2022-03-04 13:49:03
...

之所以写这篇文章,是因为Kafka初学的同学在了解了Kafka的基本原理之后,希望在自己的机器上面运行最简单的wordCount的时候,从开始安装Kafka到找到合适的example源码最后到成功运行,这个过程会花费几个小时甚至一天的时间。主要是现今网上Kafka的博客中偏向原理分析的占大多数,讲解程序实例的比较零零散散,特别是一些博客的实例代码中版本不清。本文希望给读者就如何运行第一个Kafka的WordCount程序一个清晰的认识,会表明所运行的环境,kafka版本以及版本切换和控制。

本文假设你已经初步了解什么是Kafka。

1. 总体步骤:

1.安装伪分布Kafka服务器。包括zookeeper和Kafka服务器。(Ubuntu中)
2.使用Intelij IDEA运行源码。其中使用到Maven进行Kafka的版本控制,方便选择适当的kafka版本。(Windows中)

2. Kafka服务器安装

这里本人把kafka服务器安装在Ubuntu上面,并且只是用到了一台服务器,所以使用的是伪分布的方式。这样的话Zookeeper就没有起到“管理集群”的作用,所以这里我们就不用单独安装Zookeeper,只需要启动Kafka安装包中zookeeper实例就可以了。如果已经单独安装了zookeeper(集群),可以直接拿来使用。

这里的Kafka服务器的版本可以直接使用下面所示的,在本人使用过程中,即使编程所使用的kafka依赖包跟这里的版本不一致,也不影响。因为本示例中的服务器只是为了提供类似于消息队列的功能,e.g, 数据生产者->接收->存储->发送数据->数据消费者。而编程所使用的kafka依赖包所编写的代码没有提交到这个服务器上面执行,而是在另外一个环境中执行。(如果需要提交程序到这个Kafka服务器中,就另当别论了)。

下载安装包之后,解压,进入所解压的目录:

> tar -xzf kafka_2.11-2.0.0.tgz
> cd kafka_2.11-2.0.0

启动zookeeper实例:

> bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka服务器:

> bin/kafka-server-start.sh config/server.properties

具体官方安装教程在这里。

这里需要注意的是请使用两个窗口分别执行上面两行linux命令,同时不要关闭CMD窗口,因为关闭了窗口也就关闭了Kafka服务器。 如果希望关闭CMD窗口之后不影响服务,可以参考这个.

本人在一台Linux机器上面运行了上面所述的服务,其中使用下面地址访问zookeeper:164.1x5.1x1.1x8:2181,使用下面地址访问kafka服务:164.1x5.1x1.1x8:9092. 如果是本机机器,可以使用localhost代替IP地址。接下来就是运行代码的时候了。

3. 运行WordCount程序

根据本人的经验,非常推荐大家使用集成开发环境(IDE)中的Intelij IDEA和代码版本控制工具Maven,IDEA已经非常“完美”地支持了Maven,也就是在IDEA中只需要使用鼠标就能够使用Maven的功能。
还有,本人使用的是1.8版的JDK,为的是能够使用Java的Lambda表达式,代码写起来很爽。

打开IDEA之后,新建一个工程,可以在新建工程的时候选择该工程支持Maven,也可以在建立好工程之后再选择。这里本人使用后者,感觉这样快一点。具体操作为:

鼠标点击工程名字,右键 --> add framework support --> 找到maven选上

之后会自动在项目目录中生成一个pom.xml文件,把下面的代码(为了排版,贴在文章后后面了)添加进去,就可以自动导入相应版本的Kafka依赖包了。这里使用了1.0.0版本的kafka。

下面在IDEA的新项目中新建一个java文件,copy和paste源码。我们从官方的GitHub的repository中得到,地址为:这里 。从这个链接中是可以找到其他的官方example。

修改一下zookeeper服务器的地址:(请以自己的为准)

 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "164.1x5.1x1.1x8:9092");

为了打印出结果,增加一句打印语句

counts.print();

完整代码将在文末贴出。

因为我们的wordcount程序只是一个消费者,也就是它只是单纯地从kafka服务器中获取数据并完成word count的数据处理操作,因此,我们还需要生产数据并feed给kafka服务器。如何生产数据,生产什么样的数据呢?这里大家不用担心,在文末已经贴出了数据生产者的代码,只需要把zookeeper和kafka server的IP地址替换成自己的就可以使用了。运行方式为,先运行数据producer,然后运行数据消费者,即wordcount

最后的结果大概是这种形式(每次运行都不一样):

[KSTREAM-AGGREGATE-0000000004]: kafka, (41<-null)
[KSTREAM-AGGREGATE-0000000004]: streams, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: is, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: a, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: client, (41<-null)
[KSTREAM-AGGREGATE-0000000004]: library, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: for, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: building, (21<-null)
[KSTREAM-AGGREGATE-0000000004]: applications, (41<-null)
[KSTREAM-AGGREGATE-0000000004]: and, (81<-null)
[KSTREAM-AGGREGATE-0000000004]: microservices,, (21<-null)

最后,希望本文能够给你一个关于如何执行Kafka程序的清晰认识,希望你能够在很短的时间内成功执行Kafka的Wordcount实例程序。

谢谢


下面是数据生产者的代码, 在使用之前请修改一下IP地址:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProductor {

    private static final String testString = "Kafka Streams is a client library for " +
            "building applications and microservices, where the input and output data " +
            "are stored in Kafka clusters. It combines the simplicity of writing and" +
            " deploying standard Java and Scala applications on the client side with the " +
            "benefits of Kafka's server-side cluster technology.";

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "164.1*5.1*1.1*8:9092");

        properties.put("metadata.broker.list", "164.1*5.1*1.1*8:9092");

        /**
         * "key.serializer" 的类型根据ProducerRecord<Integer,String>中的类型来确定,
         * Integer对应的为IntegerSerializer,String对应的为StringSerializer
         * key.serializer和value.serializer根据定义的ProducerRecord类型来对应
         */
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        /**
         * KeyedMessage中"test-topic"为topic的名字,"test-message"为消息内容
         * 6为对应的key值
         * "hello"为对应的value值
         */
        for(int i = 0; i < 1; i++){
            ProducerRecord<Integer,String> producerRecord = new ProducerRecord<>("streams-plaintext-input", 6, testString);

            kafkaProducer.send(producerRecord);
        }


        Thread.sleep(1000);

        kafkaProducer.close();

        System.out.println("product end");
    }
}

下面是wordcount的源码,copy自官方的example并做一点点修改。同样地,在使用之前修改一下IP地址:

package com.huai.kafka.learning.streamprocessing.wordcount2;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
 * that computes a simple word occurrence histogram from an input text.
 *
 * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
 * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
 * is an updated count of a single word.
 *
 * Before running this example you must create the input topic and the output topic (e.g. via
 * bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
 * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
 */
public class WordCount3 {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "164.1*5.1*1.1*8:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");

        KTable<String, Long> counts = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value)
                .count();

        // need to override value serde to Long type
        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
        counts.print();

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

下面是要添加在pom.xml文件中的内容,用来解决Kafka依赖包

 <properties>
        <!--<kafka.version>2.0.0-cp1</kafka.version>-->
        <kafka.version>1.0.0</kafka.version>
    </properties>


    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!--<dependency>-->
            <!--<groupId>org.apache.kafka</groupId>-->
            <!--<artifactId>kafka-streams-test-utils</artifactId>-->
            <!--<version>${kafka.version}</version>-->
            <!--<scope>test</scope>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams-test-utils</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>

    <!-- Example pom.xml snippet when using Maven to build your Java applications. -->
    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

相关标签: Kafka