欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python统计消费kafka数据量

程序员文章站 2022-04-18 20:08:58
...
from pykafka import KafkaClient
from pykafka.common import OffsetType
import datetime
import pdb
'''
提升点:
	1. 学会用pdb打断点调试
	2. 学会利用datetime.timedelta()控制程序执行的时间长度

'''
def KafkaDownloader(host_, topic_, group_id_):
    global c
    c = 0
    client = KafkaClient(hosts=host_)
    _topic = client.topics[bytes(topic_.encode())]
    consumer = _topic.get_simple_consumer(
        consumer_group=bytes(group_id_.encode()),
        auto_commit_enable=False,
        auto_offset_reset=OffsetType.LATEST,
        reset_offset_on_start=True
    )
    pdb.set_trace()
    end_time = str(datetime.datetime.now() + datetime.timedelta(minutes=10))[0:len("2019-12-06 15:00")]
    if consumer is not None:
        for message in consumer:
        	curr_time = str(datetime.datetime.now())[0:len("2019-12-06 15:00")]
            if message is not None:
            	print("curr_time:"+ curr_time+"<--------->end_time:"+end_time)
            	c=c+1
            	if curr_time == end_time:
            		break
                print(c)
        return c


def get_kafka_data():
    TOPIC = "_topic"
    HOSTS = "ip:9092,ip:9092"
    GROUP = "test"
    message = KafkaDownloader(HOSTS, TOPIC, GROUP):
    print(message)


if __name__ == '__main__':
    get_kafka_data()

相关标签: python 实战