python3操作kafka
程序员文章站
2024-02-23 21:10:28
...
第三方模板安装
pip install pykafka
pip install kafka-python
代码实现
# coding: utf-8
from kafka import KafkaProducer
from kafka import KafkaConsumer
from pykafka import KafkaClient
from BaseOperationFile.yamlbase import DoYaml
import time
kafka_config_path = r'.\config\kafka.yaml'
class DoKafka():
"""功能:直发Kafka交易串到环境"""
def __init__(self):
"""kafka初始化,建立生产者"""
kafka_config = DoYaml(kafka_config_path).yaml_data
# bootstrap_servers格式:['10.201.83.89:9092', '10.201.83.90:9092']
self.bootstrap_servers = kafka_config['bootstrap_servers']
st_time = time.time()
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
end_time = time.time()
w_time = end_time - st_time
print('kafka连接时间:'+str(w_time)[:-4])
def senfKafka(self,msg,topic):
"""发送Kafka消息"""
msg = bytes(msg, encoding='utf8')
self.producer.send(topic=topic, value=msg)
self.producer.flush()
def printToipicMasages(self,msg,topic):
"""发送Kafka消息"""
consumers = KafkaConsumer(topic,bootstrap_servers=self.bootstrap_servers)
for consumer in consumers:
msg = consumer.value
# 取order_no
try:
str_msg = str(msg, 'utf8')
print(str_msg)
except Exception as e:
print(e)
# 查看集群主题
def getTopics(self):
client = KafkaClient(hosts=self.bootstrap_servers[0])
for topic in client.topics:
print(topic)
def closeProduce(self):
"""关闭生产者"""
self.producer.close()
if __name__ == '__main__':
t = DoKafka()
t.getTopics()
t.closeProduce()
注意事项
1.window下加kafka机器别名:
C:\Windows\System32\drivers\etc\hosts
上一篇: UDS诊断看这篇就够了,吐血整理
下一篇: Oracle 自动故障诊断