消息中间件 --- Kafka快速入门
程序员文章站
2024-01-30 15:26:34
...
消息中间件 --- Kafka 快速入门
消息中间件:https://blog.51cto.com/u_9291927/category33
GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka
- Kafka快速入门(一)--- Kafka简介:https://blog.51cto.com/9291927/2493953
- Kafka快速入门(二)--- Kafka架构:https://blog.51cto.com/9291927/2497814
- Kafka快速入门(三)--- Kafka核心技术:https://blog.51cto.com/9291927/2497820
- Kafka快速入门(四)--- Kafka高级功能:https://blog.51cto.com/9291927/2497828
- Kafka快速入门(五)--- Kafka管理:https://blog.51cto.com/9291927/2497842
- Kafka快速入门(六)--- Kafka集群部署:https://blog.51cto.com/9291927/2498428
- Kafka快速入门(七)--- Kafka监控:https://blog.51cto.com/9291927/2498434
- Kafka快速入门(八)--- Confluent Kafka简介:https://blog.51cto.com/9291927/2499090
- Kafka快速入门(九)--- C客户端:https://blog.51cto.com/9291927/2502001
- Kafka快速入门(十)--- C++客户端:https://blog.51cto.com/9291927/2502063
- Kafka快速入门(十一)--- RdKafka源码分析:https://blog.51cto.com/9291927/2504489
- Kafka快速入门(十二)--- Python客户端:https://blog.51cto.com/9291927/2504495
Python3 学习(五十四):confluent-kafka 模块的使用
From:https://blog.csdn.net/liao392781/article/details/90487438
coufluent-kafka 是 Python 模块,是对 librdkafka 的轻量级封装,librdkafka 又是基于 c/c++ 的kafka 库,性能上不必多说。使用上要优于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka( Apache Kafka ) 和 Confluent Platform( Data in Motion Platform for the Enterprise | Confluent ) 的 Python 客户端。
特征:
- 高性能 : confluent-kafka-python 是 librdkafka( https://github.com/edenhill/librdkafka ) 的一个轻量级包装器,librdkafka是一个 经过精心调优的C客户端。
- 可靠性 : 在编写Apache Kafka客户端时,有很多细节要做。我们将它们放在一个地方(librdkafka)并在我们所有客户中利用这项工作(也是汇合 - kafka-go ( https://github.com/confluentinc/confluent-kafka-go ) 和 confluent-kafka-dotnet ( GitHub - confluentinc/confluent-kafka-dotnet: Confluent's Apache Kafka .NET client ))
示例代码:
# -*- coding: utf-8 -*-
# @Author :
# @Date :
# @File : kafka_operate.py
# @description : XXX
import time
import datetime
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import Producer, Consumer, KafkaError
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def kafka_producer():
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
while True:
try:
current_date = str(datetime.datetime.now().replace(microsecond=0))
data = current_date
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)
time.sleep(1)
except BaseException as be:
print(be)
break
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
def kafka_consumer():
c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['my_topic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
def kafka_avro_producer():
value_schema_str = """
{
"namespace": "my.test",
"name": "value",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
key_schema_str = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}
avro_producer = AvroProducer({
'bootstrap.servers': 'mybroker,mybroker2',
'schema.registry.url': 'http://schem_registry_host:port'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avro_producer.produce(topic='my_topic', value=value, key=key)
avro_producer.flush()
def kafka_avro_consumer():
c = AvroConsumer({
'bootstrap.servers': 'mybroker,mybroker2',
'group.id': 'groupid',
'schema.registry.url': 'http://127.0.0.1:8081'})
c.subscribe(['my_topic'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
if __name__ == '__main__':
pass
上一篇: spark处理脱敏字段
下一篇: centos常用命令的查看