欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

librdkafka高级消费接口退出时卡住问题

程序员文章站 2022-05-11 10:11:11
近期在使用librdkafka消费者接口时遇到一个问题:当消息消费完成或用户主动退出时经常卡住,gdb attach上去看了一下是调用rd_kafka_destroy时一直阻塞: 剩余3个线...
近期在使用librdkafka消费者接口时遇到一个问题:当消息消费完成或用户主动退出时经常卡住,gdb attach上去看了一下是调用rd_kafka_destroy时一直阻塞:
剩余3个线程,其中两个在pthread_join,另一个当前堆栈顶层函数是poll 查阅代码,并打印了poll的参数,该调用是有超时时间的。也就是说不会一直阻塞,问题应该出在其他地方。
(gdb) set pagination off
(gdb) info thr
  Id   Target Id         Frame 
  3    Thread 0x7f1f506ba700 (LWP 24296) "rdk:main" 0x0000003074208705 in pthread_join (threadid=139772448966400, thread_return=0x7f1f506b5d58) at pthread_join.c:92
  2    Thread 0x7f1f4f2b8700 (LWP 24299) "rdk:broker0" 0x0000003073ee1bfd in poll () at ../sysdeps/unix/syscall-template.S:81
* 1    Thread 0x7f1f514ef780 (LWP 23787) "mpp_test" 0x0000003074208705 in pthread_join (threadid=139772469946112, thread_return=0x7fff0d9410a8) at pthread_join.c:92

(gdb) thr apply all bt

Thread 3 (Thread 0x7f1f506ba700 (LWP 24296)):
Python Exception  'module' object has no attribute 'Command': 
#0  0x0000003074208705 in pthread_join (threadid=139772448966400, thread_return=0x7f1f506b5d58) at pthread_join.c:92
#1  0x0000000000475f28 in thrd_join (thr=139772448966400, res=0x0) at tinycthread.c:749
#2  0x000000000040c199 in rd_kafka_destroy_internal (rk=0x1b6c280) at rdkafka.c:850
#3  0x000000000040e7ac in rd_kafka_thread_main (arg=0x1b6c280) at rdkafka.c:1284
#4  0x0000000000475db1 in _thrd_wrapper_function (aArg=0x1b6d350) at tinycthread.c:624
#5  0x0000003074207213 in start_thread (arg=0x7f1f506ba700) at pthread_create.c:309
#6  0x0000003073eeb65d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 2 (Thread 0x7f1f4f2b8700 (LWP 24299)):
Python Exception  'module' object has no attribute 'Command': 
#0  0x0000003073ee1bfd in poll () at ../sysdeps/unix/syscall-template.S:81
#1  0x000000000043d969 in rd_kafka_transport_poll (rktrans=0x7f1f40000ab0, tmout=1000) at rdkafka_transport.c:1538
#2  0x000000000043d24c in rd_kafka_transport_io_serve (rktrans=0x7f1f40000ab0, timeout_ms=1000) at rdkafka_transport.c:1397
#3  0x000000000042081d in rd_kafka_broker_serve (rkb=0x1b6de40, abs_timeout=435806751453) at rdkafka_broker.c:2293
#4  0x0000000000425e63 in rd_kafka_broker_consumer_serve (rkb=0x1b6de40) at rdkafka_broker.c:3199
#5  0x0000000000426334 in rd_kafka_broker_thread_main (arg=0x1b6de40) at rdkafka_broker.c:3311
#6  0x0000000000475db1 in _thrd_wrapper_function (aArg=0x1b4c370) at tinycthread.c:624
#7  0x0000003074207213 in start_thread (arg=0x7f1f4f2b8700) at pthread_create.c:309
#8  0x0000003073eeb65d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 1 (Thread 0x7f1f514ef780 (LWP 23787)):
Python Exception  'module' object has no attribute 'Command': 
#0  0x0000003074208705 in pthread_join (threadid=139772469946112, thread_return=0x7fff0d9410a8) at pthread_join.c:92
#1  0x0000000000475f28 in thrd_join (thr=139772469946112, res=0x0) at tinycthread.c:749
#2  0x000000000040bc97 in rd_kafka_destroy_app (rk=0x1b6c280, blocking=1) at rdkafka.c:736
#3  0x000000000040bd18 in rd_kafka_destroy (rk=0x1b6c280) at rdkafka.c:749
#4  0x0000000000407718 in closeKafkaConsumerConnection (kafka_cxt=0x1b4c010) at test.c:618
#5  0x0000000000408e02 in main (argc=12, argv=0x7fff0d9412e8) at test.c:1259
根据线程2的调用堆栈查阅代码,看到rd_kafka_broker_consumer_serve 函数中有一个循环。于是在循环条件处设断点,然后continue继续执行,果然进入断点:
static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) {
    rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
    rd_kafka_broker_lock(rkb);

    while (!rd_kafka_broker_terminating(rkb) &&
           rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
        rd_ts_t now;
                rd_ts_t min_backoff;

        rd_kafka_broker_unlock(rkb);
        ……

        rd_kafka_broker_serve(rkb,
                now + (rkb->rkb_blocking_max_ms * 1000));

        rd_kafka_broker_lock(rkb);
    }
    ……
}
打印循环控制变量: 其中rd_kafka_broker_terminating(rkb)是一个宏,其展开来定义如下:(rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1,也就是判断(rkb)的引用计数是否<=1。个人理解<=1说明只有当前这个指针还在引用它,没有其他指针指向这个内存,可以安全释放。
Breakpoint 4, rd_kafka_broker_consumer_serve (rkb=0x1b6de40) at rdkafka_broker.c:3159
3159               rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
(gdb) p rkb->rkb_state
$3 = RD_KAFKA_BROKER_STATE_UP
(gdb) p rd_refcnt_get(&(rkb)->rkb_refcnt)
$4 = 16
很显然这里是因为(rkb)->rkb_refcnt这个引用计数不满足<=1的条件。常识判断应该是有什么对象没有释放或者销毁,以下是退出关闭环节的主要代码:仔细检查了几遍,似乎并没有什么问题。
    err = rd_kafka_consumer_close(rk);
    if (err)
        log(WARNING, "Failed to close consumer: %s\n",  rd_kafka_err2str(err));
    else
        log(WARNING, "Consumer closed\n");

    rd_kafka_destroy(rk);
    rd_kafka_topic_partition_list_destroy(tpl);

    /* Let background threads clean up and terminate cleanly. */
    int run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        log(WARNING, "Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);
作为对比,用librdkafka自带的demo测试了几次,发现没有这个问题。二者主要区别在于demo收到消息后仅增加统计计数然后就销毁。而自己的程序根据业务需要对消息做了解析,仔细检查了发现有一个解析出错的异常分支,当初测试时为了省事直接continue了,没有认真处理,也没有调用 rd_kafka_message_destroy 销毁消息对象。加上这一句后即一切正常。 总结一下 解析kafka消息时,有个别消息不满足业务规则进入异常分支,没有调用rd_kafka_message_destroy,造成内存泄露并导致rd_kafka_broker_t对象的引用计数无法释放,在退出时因不满足条件而一直循环。