Kafka问题
程序员文章站
2022-06-14 09:08:36
...
问题描述
当发送多条消息到kafka的时候发现数据丢失,开始排查问题。
排查过程
首先读取文件之后发送消息到kafka。发的同时发现kafka抛出has passed since last append的异常。发现出现这个问题的原因是kafka producer的一个配置项request.timeout.ms导致的。查看producer源码,发现有如下代码:
boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
boolean expired = expiryErrorMessage != null;
if (expired)
abortRecordAppends();
return expired;
}
根据代码注释可得知,当batch不处于retry,并且ready(full or linger.ms has reached)之后,request timeout超时。
解决此问题,可以将request.timeout.ms调大。还有另外一种方式是采取多线程的方式。