python消息中间件 kafka
程序员文章站
2024-01-30 15:22:40
...
下载与安装 Kafka
wget http://apache.fayea.com/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzvf kafka_2.12-2.2.1.tgz
cd kafka_2.12-2.2.1
# 先启动zookeeper(kafka依赖)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动失败 @1 缺少jdk
# 启动kafka
bin/kafka-server-start.sh config/server.properties
@1 安装JDK8
# 首先下载,需要登录
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
tar -xzvf jdk-8u211-linux-x64.tar.gz
mv jdk1.8.0_211 /opt/jdk1.8
vim /etc/profile
## JAVA
export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#:wq 保存退出
source /etc/profile
# 验证
java -version
发送与接受消息
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
# --创建一个 名字 test1
# --副本数量为 1
# --partitions数量为1 的 topic
Created topic test1. # 创建成功
# 查看当前kafka中 存在哪些topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 一般不需要创建topic,插入数据后,如果没有,则会自动创建
# 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
# 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
kafka python
开发
# pip install kafka-python
from kafka import KafkaProducer
import names
producer = KafkaProducer()
for _ in range(10):
name = names.get_full_name()
future = producer.send('test',bytes(name,'ascii'))
result = future.get(60)
print(result)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',group_id='test01')
for msg in consumer:
print(msg)
上一篇: 解决ztree搜索中多级菜单展示不全问题
下一篇: 消息中间件-kafka(一)
推荐阅读
-
消息中间件-kafka(一)
-
python消息中间件 kafka
-
关于 python gevent 架框 作为 TCP服务器 的 代码问题 , 每个 socket 的 消息 接收 是否有使用 事件监听回调的方法呢?
-
RabbitMQ 消息中间件搭建详解
-
资深程序员教你用Python如何调企业微信接口发送消息!叼的不行!
-
kafka自定义分区策略-将消息发送到我们指定的partition
-
SpringBoot开发案例之整合Kafka实现消息队列
-
kafka 消息发送和接收
-
案例:使用logstash收集游戏服务器日志,输出到kafka消息队列中,然后存入ES
-
Flume读取数据写入Kafka消息队列中