欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件 --- Kafka快速入门

程序员文章站 2024-01-30 15:26:34
...

消息中间件 --- Kafka 快速入门

消息中间件:https://blog.51cto.com/u_9291927/category33

GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka

Python3 学习(五十四):confluent-kafka 模块的使用

From:https://blog.csdn.net/liao392781/article/details/90487438

coufluent-kafka 是 Python 模块,是对 librdkafka 的轻量级封装,librdkafka 又是基于 c/c++ 的kafka 库,性能上不必多说。使用上要优于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka( Apache Kafka ) 和 Confluent Platform( Data in Motion Platform for the Enterprise | Confluent ) 的 Python 客户端。

特征:

示例代码:

# -*- coding: utf-8 -*-
# @Author  : 
# @Date    :
# @File    : kafka_operate.py
# @description : XXX


import time
import datetime
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import Producer, Consumer, KafkaError


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def kafka_producer():
    p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
    while True:
        try:
            current_date = str(datetime.datetime.now().replace(microsecond=0))
            data = current_date
            # Trigger any available delivery report callbacks from previous produce() calls
            p.poll(0)

            # Asynchronously produce a message, the delivery report callback
            # will be triggered from poll() above, or flush() below, when the message has
            # been successfully delivered or failed permanently.
            p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)
            time.sleep(1)
        except BaseException as be:
            print(be)
            break
    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    p.flush()


def kafka_consumer():
    c = Consumer({
        'bootstrap.servers': 'mybroker',
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest'
    })
    c.subscribe(['my_topic'])
    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    c.close()


def kafka_avro_producer():
    value_schema_str = """
    {
       "namespace": "my.test",
       "name": "value",
       "type": "record",
       "fields" : [
         {
           "name" : "name",
           "type" : "string"
         }
       ]
    }
    """

    key_schema_str = """
    {
       "namespace": "my.test",
       "name": "key",
       "type": "record",
       "fields" : [
         {
           "name" : "name",
           "type" : "string"
         }
       ]
    }
    """

    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}

    avro_producer = AvroProducer({
        'bootstrap.servers': 'mybroker,mybroker2',
        'schema.registry.url': 'http://schem_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avro_producer.produce(topic='my_topic', value=value, key=key)
    avro_producer.flush()


def kafka_avro_consumer():
    c = AvroConsumer({
        'bootstrap.servers': 'mybroker,mybroker2',
        'group.id': 'groupid',
        'schema.registry.url': 'http://127.0.0.1:8081'})
    c.subscribe(['my_topic'])
    while True:
        try:
            msg = c.poll(10)
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
        if msg is None:
            continue
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
        print(msg.value())
    c.close()


if __name__ == '__main__':
    pass