欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka问题

程序员文章站 2022-06-14 09:08:36
...

has passed since last append

问题描述

当发送多条消息到kafka的时候发现数据丢失,开始排查问题。

排查过程

首先读取文件之后发送消息到kafka。发的同时发现kafka抛出has passed since last append的异常。发现出现这个问题的原因是kafka producer的一个配置项request.timeout.ms导致的。查看producer源码,发现有如下代码:

 boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";

        boolean expired = expiryErrorMessage != null;
        if (expired)
            abortRecordAppends();
        return expired;
    }

根据代码注释可得知,当batch不处于retry,并且ready(full or linger.ms has reached)之后,request timeout超时。
解决此问题,可以将request.timeout.ms调大。还有另外一种方式是采取多线程的方式。

相关标签: kafak last append