欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python消息中间件 kafka

程序员文章站 2024-01-30 15:22:40
...

下载与安装 Kafka

wget http://apache.fayea.com/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzvf kafka_2.12-2.2.1.tgz 
cd kafka_2.12-2.2.1

# 先启动zookeeper(kafka依赖)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动失败 @1 缺少jdk

# 启动kafka
bin/kafka-server-start.sh config/server.properties

@1 安装JDK8

# 首先下载,需要登录
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
tar -xzvf jdk-8u211-linux-x64.tar.gz
mv jdk1.8.0_211 /opt/jdk1.8


vim /etc/profile

## JAVA

export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#:wq 保存退出

source /etc/profile
# 验证
java -version

发送与接受消息

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

# --创建一个 名字 test1
# --副本数量为 1
# --partitions数量为1 的 topic

Created topic test1. # 创建成功

# 查看当前kafka中 存在哪些topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 一般不需要创建topic,插入数据后,如果没有,则会自动创建

# 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
# 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning

kafka python 开发

# pip install kafka-python

from kafka import KafkaProducer
import names

producer = KafkaProducer()

for _ in range(10):
    name = names.get_full_name()
    future = producer.send('test',bytes(name,'ascii'))
    result = future.get(60)
    print(result)
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',group_id='test01')

for msg in consumer:
    print(msg)