librdkafka高级消费接口退出时卡住问题
程序员文章站
2022-09-28 18:34:33
近期在使用librdkafka消费者接口时遇到一个问题:当消息消费完成或用户主动退出时经常卡住,gdb attach上去看了一下是调用rd_kafka_destroy时一直阻塞:
剩余3个线...
近期在使用librdkafka消费者接口时遇到一个问题:当消息消费完成或用户主动退出时经常卡住,gdb attach上去看了一下是调用
rd_kafka_destroy时一直阻塞:
剩余3个线程,其中两个在pthread_join,另一个当前堆栈顶层函数是poll 查阅代码,并打印了poll的参数,该调用是有超时时间的。也就是说不会一直阻塞,问题应该出在其他地方。
(gdb) set pagination off (gdb) info thr Id Target Id Frame 3 Thread 0x7f1f506ba700 (LWP 24296) "rdk:main" 0x0000003074208705 in pthread_join (threadid=139772448966400, thread_return=0x7f1f506b5d58) at pthread_join.c:92 2 Thread 0x7f1f4f2b8700 (LWP 24299) "rdk:broker0" 0x0000003073ee1bfd in poll () at ../sysdeps/unix/syscall-template.S:81 * 1 Thread 0x7f1f514ef780 (LWP 23787) "mpp_test" 0x0000003074208705 in pthread_join (threadid=139772469946112, thread_return=0x7fff0d9410a8) at pthread_join.c:92 (gdb) thr apply all bt Thread 3 (Thread 0x7f1f506ba700 (LWP 24296)): Python Exception 'module' object has no attribute 'Command': #0 0x0000003074208705 in pthread_join (threadid=139772448966400, thread_return=0x7f1f506b5d58) at pthread_join.c:92 #1 0x0000000000475f28 in thrd_join (thr=139772448966400, res=0x0) at tinycthread.c:749 #2 0x000000000040c199 in rd_kafka_destroy_internal (rk=0x1b6c280) at rdkafka.c:850 #3 0x000000000040e7ac in rd_kafka_thread_main (arg=0x1b6c280) at rdkafka.c:1284 #4 0x0000000000475db1 in _thrd_wrapper_function (aArg=0x1b6d350) at tinycthread.c:624 #5 0x0000003074207213 in start_thread (arg=0x7f1f506ba700) at pthread_create.c:309 #6 0x0000003073eeb65d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111 Thread 2 (Thread 0x7f1f4f2b8700 (LWP 24299)): Python Exception 'module' object has no attribute 'Command': #0 0x0000003073ee1bfd in poll () at ../sysdeps/unix/syscall-template.S:81 #1 0x000000000043d969 in rd_kafka_transport_poll (rktrans=0x7f1f40000ab0, tmout=1000) at rdkafka_transport.c:1538 #2 0x000000000043d24c in rd_kafka_transport_io_serve (rktrans=0x7f1f40000ab0, timeout_ms=1000) at rdkafka_transport.c:1397 #3 0x000000000042081d in rd_kafka_broker_serve (rkb=0x1b6de40, abs_timeout=435806751453) at rdkafka_broker.c:2293 #4 0x0000000000425e63 in rd_kafka_broker_consumer_serve (rkb=0x1b6de40) at rdkafka_broker.c:3199 #5 0x0000000000426334 in rd_kafka_broker_thread_main (arg=0x1b6de40) at rdkafka_broker.c:3311 #6 0x0000000000475db1 in _thrd_wrapper_function (aArg=0x1b4c370) at tinycthread.c:624 #7 0x0000003074207213 in start_thread (arg=0x7f1f4f2b8700) at pthread_create.c:309 #8 0x0000003073eeb65d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111 Thread 1 (Thread 0x7f1f514ef780 (LWP 23787)): Python Exception 'module' object has no attribute 'Command': #0 0x0000003074208705 in pthread_join (threadid=139772469946112, thread_return=0x7fff0d9410a8) at pthread_join.c:92 #1 0x0000000000475f28 in thrd_join (thr=139772469946112, res=0x0) at tinycthread.c:749 #2 0x000000000040bc97 in rd_kafka_destroy_app (rk=0x1b6c280, blocking=1) at rdkafka.c:736 #3 0x000000000040bd18 in rd_kafka_destroy (rk=0x1b6c280) at rdkafka.c:749 #4 0x0000000000407718 in closeKafkaConsumerConnection (kafka_cxt=0x1b4c010) at test.c:618 #5 0x0000000000408e02 in main (argc=12, argv=0x7fff0d9412e8) at test.c:1259
根据线程2的调用堆栈查阅代码,看到rd_kafka_broker_consumer_serve 函数中有一个循环。于是在循环条件处设断点,然后continue继续执行,果然进入断点:
static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) { rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); rd_kafka_broker_lock(rkb); while (!rd_kafka_broker_terminating(rkb) && rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { rd_ts_t now; rd_ts_t min_backoff; rd_kafka_broker_unlock(rkb); …… rd_kafka_broker_serve(rkb, now + (rkb->rkb_blocking_max_ms * 1000)); rd_kafka_broker_lock(rkb); } …… }
打印循环控制变量: 其中rd_kafka_broker_terminating(rkb)是一个宏,其展开来定义如下:(rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1,也就是判断(rkb)的引用计数是否<=1。个人理解<=1说明只有当前这个指针还在引用它,没有其他指针指向这个内存,可以安全释放。
Breakpoint 4, rd_kafka_broker_consumer_serve (rkb=0x1b6de40) at rdkafka_broker.c:3159 3159 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { (gdb) p rkb->rkb_state $3 = RD_KAFKA_BROKER_STATE_UP (gdb) p rd_refcnt_get(&(rkb)->rkb_refcnt) $4 = 16
很显然这里是因为(rkb)->rkb_refcnt这个引用计数不满足<=1的条件。常识判断应该是有什么对象没有释放或者销毁,以下是退出关闭环节的主要代码:仔细检查了几遍,似乎并没有什么问题。
err = rd_kafka_consumer_close(rk); if (err) log(WARNING, "Failed to close consumer: %s\n", rd_kafka_err2str(err)); else log(WARNING, "Consumer closed\n"); rd_kafka_destroy(rk); rd_kafka_topic_partition_list_destroy(tpl); /* Let background threads clean up and terminate cleanly. */ int run = 5; while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) log(WARNING, "Waiting for librdkafka to decommission\n"); if (run <= 0) rd_kafka_dump(stdout, rk);
作为对比,用librdkafka自带的demo测试了几次,发现没有这个问题。二者主要区别在于demo收到消息后仅增加统计计数然后就销毁。而自己的程序根据业务需要对消息做了解析,仔细检查了发现有一个解析出错的异常分支,当初测试时为了省事直接continue了,没有认真处理,也没有调用 rd_kafka_message_destroy 销毁消息对象。加上这一句后即一切正常。 总结一下 解析kafka消息时,有个别消息不满足业务规则进入异常分支,没有调用rd_kafka_message_destroy,造成内存泄露并导致rd_kafka_broker_t对象的引用计数无法释放,在退出时因不满足条件而一直循环。
上一篇: Nanui For WinForm
下一篇: vue几个常用跨域处理方式介绍