欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时数仓实践从0到1之路之kafka安装部署以及案例程序演示

程序员文章站 2022-06-14 14:57:02
...

STEP 1: GET KAFKA

  • Download installation package:

  • kafka https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz

$ tar -xzf kafka_2.12-2.6.0.tgz
$ cd kafka_2.12-2.6.0

STEP 2: START THE KAFKA ENVIRONMENT

  • 注: 本地环境Java 8 以上

按正确的顺序启动以下命令:

  • 开启zk服务
  • 注:zk非kafka必要服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另外一个终端:开启Kafka broker 服务

$ bin/kafka-server-start.sh config/server.properties

等所有服务都启动成功,我们就有了kafka基础运行环境并准备使用它

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka是一个分布式事件流平台,它允许您跨多台机器读、写、存储和处理事件(在文档中也称为记录或消息)。
事件可以是支付事务、来自移动电话的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。
因此,在编写第一个事件之前,必须创建一个主题.
打开另外一个终端:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

如下命令可以活动更多的kafka运行信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Kafka客户端通过网络与Kafka代理通信以写(或读取)事件。一旦接收到这些事件,代理将以持久和容错的方式存储这些事件,您需要多长时间就存储多长时间——甚至永远存储。
运行控制台生成器客户端,在主题中写入一些事件。
默认情况下,您输入的每一行都将导致向主题写入一个单独的事件。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

你可以使用Ctrl + C 在任何时间停止这个生产者客户端

STEP 5: READ THE EVENTS

打开另一个终端会话并运行控制台消费者客户端来读取您刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

你可以使用Ctrl + C 在任何时间停止这个消费者客户端

您可以*试验:例如,切换回您的生产者终端(上一步)来编写额外的事件,并查看事件如何立即显示在您的消费者终端中。
因为事件长期存储在Kafka中,所以它们可以被任意多的消费者读取。您可以通过打开另一个终端会话并再次运行之前的命令来轻松地验证这一点。

STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经使用这些系统的应用程序。Kafka Connect允许您不断地从外部系统摄取数据到Kafka,反之亦然。因此,与Kafka集成现有系统是非常容易的。为了使这个过程更容易,有数百个这样的连接器可用。
看一看Kafka Connect部分,了解更多关于如何持续地导入/导出数据到Kafka和导出。

  • https://kafka.apache.org/documentation/#connect

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS

一旦您的数据以事件的形式存储在Kafka中,您就可以使用Kafka Streams客户端库处理数据。它允许您实现关键任务的实时应用程序和微服务,其中输入或输出数据存储在Kafka主题中。Kafka Streams将客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势结合在一起,使这些应用程序具有高度的可伸缩性、弹性、容错性和分布式。这个库完全支持一次处理、有状态操作和聚合、窗口、连接、基于事件时间的处理等等。

给你一个初步的体验,这里是如何实现流行的WordCount算法:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));

之前的生产者和消费者窗口可以关闭了,然后进行wordcount的演示:

  • https://kafka.apache.org/25/documentation/streams/quickstart
  • https://kafka.apache.org/25/documentation/streams/tutorial

创建输入topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic streams-plaintext-input

创建输出topic 并启用压缩

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

此条命令可以查看topics列表

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

启动wordcount演示程序

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

写入数据

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

程序输出运行结果

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
输入数据
hello world
all streams lead to kafka
hello world
输出结果
hello	1
world	1
all	1
streams	1
lead	1
to	1
kafka	1
hello	2
world	2

STEP 8: TERMINATE THE KAFKA ENVIRONMENT

kafka基础演示已经完毕,然后进行环境终止

  • Ctrl-C停止生产者和消费者客户端
  • 用Ctrl-C停止Kafka代理
  • 最后,用Ctrl-C停止ZooKeeper服务器。
    如果您还想删除您本地Kafka环境的任何数据,包括您在此过程中创建的任何事件,运行命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper

更新的有点慢,各位看官如果有兴趣,来个一键三连!!!谢谢!