欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka 安装 kafka-manager 安装 及python调用

程序员文章站 2022-03-14 21:18:04
...

kafka 安装 kafka-manager 安装 及python调用

docker-compose 文件

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /data/product/zj_bigdata/data/kafka/docker.sock:/var/run/docker.sock
  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    ports:
      - "9000:9000"
    links:
      - zookeeper
      - kafka
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: letmein
      KM_ARGS: -Djava.net.preferIPv4Stack=true

kafka 安装 kafka-manager 安装 及python调用
kafka-python 1.4.7
github地址: https://github.com/dpkp/kafka-python
文档地址: https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
由于消息使用snappy 压缩方式,还需要安装python-snappy 0.5.4
snappy 安装需要gcc 及

ubuntu : sudo apt-get install libsnappy-dev
centos : yaml install  snappy-devel

客户端代码

class KafkaConsumerM(object):

    def __init__(self,host,port,topic,group_id):
        self.consumer = KafkaConsumer(topic,group_id=group_id, bootstrap_servers=[f'{host}:{port}'],
                                 auto_offset_reset='latest',
                                 enable_auto_commit=True,auto_commit_interval_ms=2000,consumer_timeout_ms=10*60*1000)
try:

    MyKafkaConsumerM = KafkaConsumerM(Config.KAFKA_HOST, Config.KAFKA_PORT, Config.KAFKA_TOPIC, Config.KAFKA_TOPIC)

except Exception as e:
    print(os.getcwd(),": connect error:",e)
    os._exit(1)
链接代码:
from dbsource.mongoHelper import ServerMongoStore
from dbsource.kafkaHelper import MyKafkaConsumerM
import json
import os
import time
from etc.config import Config
from comm.funcs import Funcs
from comm.log import logger

logs = []
cachelen = Config.CACHE_LEN
    def saveToMongo():
        logger.info("saveToMongo 任务启动")
        # 将flume 发送到kafka的内容记录到mongo中
        while True:
            try:
                for msg in MyKafkaConsumerM.consumer:
                    try:
                        log = json.loads(msg[6].decode("utf-8"))
                        try:
                            log["status"] = log["response"]["status"]
                            log["time"] = int(time.time())
                            del log["latencies"]
                            del log["service"]
                            del log["route"]
                            del log["response"]
                            del log["tries"]
                        except Exception as e:
                            print(os.getcwd(), ", json decode error:", e)
                        logs.append(log)
                    except Exception as e:
                        print(os.getcwd(), ", delete keys error:", e)
                        continue
                    try:
                        if len(logs) >= cachelen:
                            ServerMongoStore.insertMany(logs)
                            logs.clear()
                    except Exception as e:
                        print(os.getcwd(), ", mongo insert error:", e)
                    time.sleep(1)
            except Exception as e:
                print(os.getcwd(), ", error:", e)
                continue
相关标签: 工具使用