欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka环境搭建和使用(python API)

程序员文章站 2022-06-30 18:46:49
引言 上一篇文章了解了kafka的重要组件zookeeper,用来保存broker、consumer等相关信息,做到平滑扩展。这篇文章就实际操作部署下kafka,用几个简单的例子加深对kafka的理解,学会基本使用kafka。 环境搭建 我将会在本地部署一个三台机器的zookeeper集群,和一个2 ......

引言

上一篇文章了解了kafka的重要组件zookeeper,用来保存broker、consumer等相关信息,做到平滑扩展。这篇文章就实际操作部署下kafka,用几个简单的例子加深对kafka的理解,学会基本使用kafka。

环境搭建

我将会在本地部署一个三台机器的zookeeper集群,和一个2台机器的kafka集群。

zookeeper集群

zookeeper的搭建可以看我的上一篇文章

producer 向broker发送消息

bootstrap_servers是kafka集群地址信息,下面事项主题user-event发送一条消息,send发送消息是异步的,会马上返回,因此我们要通过阻塞的方式等待消息发送成功(或者flush()也可以,flush会阻塞知道所有log都发送成功),否则消息可能会发送失败,但也不会有提示,关于上面这个可以通过删除send之后的语句试试,会发现broker不会收到消息,然后在send后加上time.sleep(10)之后,会看到broker收到消息。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
  "localhost:9094"
  ]
)

future = producer.send("user-event", b'I am rito yan')
try:
    record_metadata = future.get(timeout=10)
    print_r(record_metadata)
except KafkaError as e:
    print(e)

阻塞等待发送成功之后,会看到返回插入记录的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13),里面包括了插入log的主题、分区等信息。

格式化发送的信息

创建producer的时候可以通过value_serializer指定格式化函数,比如我们数据是个dict,可以指定格式化函数,将dict转化为byte:

import json

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
        "localhost:9094"
    ],
    value_serializer=lambda m: json.dumps(m).encode('ascii')
)

future = producer.send("user-event", {
    "name": "燕睿涛",
    "age": 26,
    "friends": [
        "ritoyan",
        "luluyrt"
    ]
})

这样就可以将格式化之后的信息发送给broker,不用每次发送的时候都自己格式化,真是不要太好用。

consumer 消费数据

创建一个consumer,其中group_id是分组,broker中的每一个数据只能被consumer组中的一个consumer消费。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-event",
    group_id = "user-event-test",
    bootstrap_servers = [
        "localhost:9093",
        "localhost:9094"
    ]
)
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

启动之后,进程会一直阻塞在哪里,等broker中有消息的时候就会去消费,启动多个进程,只要保证group_id一致,就可以保证消息只被组内的一个consumer消费,上面的程序会输出:

user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'

同样,进入的时候有value_serializer,出来的时候对应的也有value_deserializer,消费者可以配置value_deserializer来格式化内容,跟producer对应起来

consumer = KafkaConsumer(
    "user-event",
  group_id = "user-event-test",
  bootstrap_servers = [
        "localhost:9093",
  "localhost:9094"
  ],
  value_deserializer=lambda m: json.loads(m.decode('ascii'))
)

输出内容user-event:8:3: key=None value={'name': '燕睿涛', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}

kafka其他命令

查看分组

我们的consumer可能有很多分组,可以通过西面的命令查看分组信息:

cd /path/to/kafka
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list

可以看到我使用中的分组有4个,分别如下所示

clock-tick-test3
user-event-test
clock-tick-test2
clock-tick-test

查看特定分组信息

可以通过bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe,查看分组user-event-test的信息,可以看到西面的信息,包含消费的主题、分区信息,以及consumer在分区中的offset和分区的总offset。(为了格式化显示,删了部分列的部分字母)

TOPIC       PARTITION   CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST    CLIENT-ID
user-event  3   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  0   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  1   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  2   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  4   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  9   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  8   4   4   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  7   2   2   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  6   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  5   0   0   0   kafka-python-78517 /127.0.0.1   kafka-python

结语

至此,kafka的基本使用算是掌握了,以后要是有机会在项目中实践就好了,在实际工程中的各种问题可以更加深刻的理解其中的原理。