欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python3操作kafka

程序员文章站 2024-02-23 21:10:28
...

第三方模板安装

pip install pykafka
pip install kafka-python

代码实现

# coding: utf-8
from kafka import KafkaProducer
from kafka import KafkaConsumer
from pykafka import KafkaClient
from BaseOperationFile.yamlbase import DoYaml
import time

kafka_config_path = r'.\config\kafka.yaml'
class DoKafka():
    """功能:直发Kafka交易串到环境"""
    def __init__(self):
        """kafka初始化,建立生产者"""
        kafka_config = DoYaml(kafka_config_path).yaml_data
        # bootstrap_servers格式:['10.201.83.89:9092', '10.201.83.90:9092']
        self.bootstrap_servers = kafka_config['bootstrap_servers']
        st_time = time.time()
        self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
        end_time = time.time()
        w_time = end_time - st_time
        print('kafka连接时间:'+str(w_time)[:-4])

    def senfKafka(self,msg,topic):
        """发送Kafka消息"""
        msg = bytes(msg, encoding='utf8')
        self.producer.send(topic=topic, value=msg)
        self.producer.flush()

    def printToipicMasages(self,msg,topic):
        """发送Kafka消息"""
        consumers = KafkaConsumer(topic,bootstrap_servers=self.bootstrap_servers)
        for consumer in consumers:
            msg = consumer.value
            # 取order_no
            try:
                str_msg = str(msg, 'utf8')
                print(str_msg)
            except Exception as e:
                print(e)

    # 查看集群主题

    def getTopics(self):
        client = KafkaClient(hosts=self.bootstrap_servers[0])
        for topic in client.topics:
            print(topic)



    def closeProduce(self):
        """关闭生产者"""
        self.producer.close()

if __name__ == '__main__':
    t = DoKafka()
    t.getTopics()
    t.closeProduce()

注意事项

1.window下加kafka机器别名:
C:\Windows\System32\drivers\etc\hosts

python3操作kafka