kafka 安装 kafka-manager 安装 及python调用
程序员文章站
2022-03-14 21:18:04
...
kafka 安装 kafka-manager 安装 及python调用
docker-compose 文件
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /data/product/zj_bigdata/data/kafka/docker.sock:/var/run/docker.sock
kafka-manager:
image: sheepkiller/kafka-manager:latest
ports:
- "9000:9000"
links:
- zookeeper
- kafka
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
kafka-python 1.4.7
github地址: https://github.com/dpkp/kafka-python
文档地址: https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
由于消息使用snappy 压缩方式,还需要安装python-snappy 0.5.4
snappy 安装需要gcc 及
ubuntu : sudo apt-get install libsnappy-dev
centos : yaml install snappy-devel
客户端代码
class KafkaConsumerM(object):
def __init__(self,host,port,topic,group_id):
self.consumer = KafkaConsumer(topic,group_id=group_id, bootstrap_servers=[f'{host}:{port}'],
auto_offset_reset='latest',
enable_auto_commit=True,auto_commit_interval_ms=2000,consumer_timeout_ms=10*60*1000)
try:
MyKafkaConsumerM = KafkaConsumerM(Config.KAFKA_HOST, Config.KAFKA_PORT, Config.KAFKA_TOPIC, Config.KAFKA_TOPIC)
except Exception as e:
print(os.getcwd(),": connect error:",e)
os._exit(1)
链接代码:
from dbsource.mongoHelper import ServerMongoStore
from dbsource.kafkaHelper import MyKafkaConsumerM
import json
import os
import time
from etc.config import Config
from comm.funcs import Funcs
from comm.log import logger
logs = []
cachelen = Config.CACHE_LEN
def saveToMongo():
logger.info("saveToMongo 任务启动")
# 将flume 发送到kafka的内容记录到mongo中
while True:
try:
for msg in MyKafkaConsumerM.consumer:
try:
log = json.loads(msg[6].decode("utf-8"))
try:
log["status"] = log["response"]["status"]
log["time"] = int(time.time())
del log["latencies"]
del log["service"]
del log["route"]
del log["response"]
del log["tries"]
except Exception as e:
print(os.getcwd(), ", json decode error:", e)
logs.append(log)
except Exception as e:
print(os.getcwd(), ", delete keys error:", e)
continue
try:
if len(logs) >= cachelen:
ServerMongoStore.insertMany(logs)
logs.clear()
except Exception as e:
print(os.getcwd(), ", mongo insert error:", e)
time.sleep(1)
except Exception as e:
print(os.getcwd(), ", error:", e)
continue