关于高并发下kafka producer send异步发送耗时问题的分析
最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍。所以就对kafka api send 的源码进行了一下跟踪和分析,在此总结记录一下。
首先看springboot下 kafka producer 的使用
在config中进行配置,向ioc容器中注入defaultkafkaproducerfactory生产者工厂的实例
@bean public producerfactory<object, object> producerfactory() { return new defaultkafkaproducerfactory<>(producerconfigs()); }
创建producer
this.producer = producerfactory.createproducer();
大家都知道springboot下ioc容器管理的实例默认都是单例模式;而defaultkafkaproducerfactory本身也是一个单例工厂
@override public producer<k, v> createproducer() { if (this.transactionidprefix != null) { return createtransactionalproducer(); } if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = new closesafeproducer<k, v>(createkafkaproducer()); } } } return this.producer; }
我们创建的producer也是个单例。
接下来就是具体的发送,用过kafka的小伙伴都知道producer.send是个异步操作,会返回一个future<recordmetadata> 类型的结果。那么为什么单线程和多线程send效率会较大的差距呢,我们进入kafkaproducer内部看下producer.send的具体源码实现来找下答案
private future<recordmetadata> dosend(producerrecord<k, v> record, callback callback) { topicpartition tp = null; try { //保证主题的元数据可用 clusterandwaittime clusterandwaittime = waitonmetadata(record.topic(), record.partition(), maxblocktimems); long remainingwaitms = math.max(0, maxblocktimems - clusterandwaittime.waitedonmetadatams); cluster cluster = clusterandwaittime.cluster; byte[] serializedkey; try { //序列化key serializedkey = keyserializer.serialize(record.topic(), record.headers(), record.key()); } catch (classcastexception cce) { throw new serializationexception("can't convert key of class " + record.key().getclass().getname() + " to class " + producerconfig.getclass(producerconfig.key_serializer_class_config).getname() + " specified in key.serializer", cce); } byte[] serializedvalue; try { //序列化value serializedvalue = valueserializer.serialize(record.topic(), record.headers(), record.value()); } catch (classcastexception cce) { throw new serializationexception("can't convert value of class " + record.value().getclass().getname() + " to class " + producerconfig.getclass(producerconfig.value_serializer_class_config).getname() + " specified in value.serializer", cce); } //计算出具体的partition int partition = partition(record, serializedkey, serializedvalue, cluster); tp = new topicpartition(record.topic(), partition); setreadonly(record.headers()); header[] headers = record.headers().toarray(); int serializedsize = abstractrecords.estimatesizeinbytesupperbound(apiversions.maxusableproducemagic(), compressiontype, serializedkey, serializedvalue, headers); ensurevalidrecordsize(serializedsize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback callback interceptcallback = new interceptorcallback<>(callback, this.interceptors, tp); if (transactionmanager != null && transactionmanager.istransactional()) transactionmanager.maybeaddpartitiontotransaction(tp); //向队列容器中添加数据 recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey, serializedvalue, headers, interceptcallback, remainingwaitms); if (result.batchisfull || result.newbatchcreated) { log.trace("waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for api exceptions return them in the future, // for other exceptions throw directly } catch (apiexception e) { log.debug("exception occurred during message send:", e); if (callback != null) callback.oncompletion(null, e); this.errors.record(); this.interceptors.onsenderror(record, tp, e); return new futurefailure(e); } catch (interruptedexception e) { this.errors.record(); this.interceptors.onsenderror(record, tp, e); throw new interruptexception(e); } catch (bufferexhaustedexception e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onsenderror(record, tp, e); throw e; } catch (kafkaexception e) { this.errors.record(); this.interceptors.onsenderror(record, tp, e); throw e; } catch (exception e) { // we notify interceptor about all exceptions, since onsend is called before anything else in this method this.interceptors.onsenderror(record, tp, e); throw e; } }
这里除了前面做的一些序列化操作和判断,最关键的就是向队列容器中执行添加数据操作
recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey, serializedvalue, headers, interceptcallback, remainingwaitms);
accumulator是recordaccumulator这个类的一个实例,recordaccumulator类是一个队列容器类;它的内部维护了一个concurrentmap,每一个topicpartition都对应一个专属的消息队列。
private final concurrentmap<topicpartition, deque<producerbatch>> batches;
我们进入accumulator.append内部看下具体的实现
public recordappendresult append(topicpartition tp, long timestamp, byte[] key, byte[] value, header[] headers, callback callback, long maxtimetoblock) throws interruptedexception { // we keep track of the number of appending thread to make sure we do not miss batches in // abortincompletebatches(). appendsinprogress.incrementandget(); bytebuffer buffer = null; if (headers == null) headers = record.empty_headers; try { //根据topicpartition拿到对应的批处理队列 deque<producerbatch> dq = getorcreatedeque(tp); //同步队列,保证线程安全 synchronized (dq) { if (closed) throw new illegalstateexception("cannot send after the producer is closed."); //把序列化后的数据放入队列,并返回结果 recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq); if (appendresult != null) return appendresult; } // we don't have an in-progress record batch try to allocate a new batch byte maxusablemagic = apiversions.maxusableproducemagic(); int size = math.max(this.batchsize, abstractrecords.estimatesizeinbytesupperbound(maxusablemagic, compression, key, value, headers)); log.trace("allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxtimetoblock); synchronized (dq) { // need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new illegalstateexception("cannot send after the producer is closed."); recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq); if (appendresult != null) { // somebody else found us a batch, return the one we waited for! hopefully this doesn't happen often... return appendresult; } memoryrecordsbuilder recordsbuilder = recordsbuilder(buffer, maxusablemagic); producerbatch batch = new producerbatch(tp, recordsbuilder, time.milliseconds()); futurerecordmetadata future = utils.notnull(batch.tryappend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addlast(batch); incomplete.add(batch); // don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new recordappendresult(future, dq.size() > 1 || batch.isfull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsinprogress.decrementandget(); } }
在getorcreatedeque中我们根据topicpartition从concurrentmap获取对应队列,没有的话就初始化一个。
private deque<producerbatch> getorcreatedeque(topicpartition tp) { deque<producerbatch> d = this.batches.get(tp); if (d != null) return d; d = new arraydeque<>(); deque<producerbatch> previous = this.batches.putifabsent(tp, d); if (previous == null) return d; else return previous; }
更关键的是为了保证并发时的线程安全,执行 recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq)时,deque<producerbatch>必然需要同步处理。
synchronized (dq) { if (closed) throw new illegalstateexception("cannot send after the producer is closed."); recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq); if (appendresult != null) return appendresult; }
在这里我们可以看出,多线程高并发情况下,dq会存在比较大的资源竞争,虽然是基于内存的操作,每个线程持有锁的时间极短,但相比单线程情况,高并发情况下线程开辟较多,锁竞争和cpu上下文切换都比较频繁,会造成一定的性能损耗,产生阻塞耗时。
分析到这里你就会发现,其实kafkaproducer这个异步发送是建立在生产者和消费者模式上的,send的真正操作并不是直接异步发送,而是把数据放在一个中间队列中。那么既然有生产者在往内存队列中放入数据,那么必然会有一个专有的线程负责把这些数据真正发送出去。我们通过监控jvm线程信息可以看到,kafkaproducer创建后确实会启动一个守护线程用于消息的发送。
ok,我们再回到 kafkaproducer中,会看到里面有这样两个对象,sender就是kafka发送数据的后台线程
private final sender sender; private final thread iothread;
在kafkaproducer的构造函数中会启动sender线程
this.sender = new sender(logcontext, client, this.metadata, this.accumulator, maxinflightrequests == 1, config.getint(producerconfig.max_request_size_config), acks, retries, metricsregistry.sendermetrics, time.system, this.requesttimeoutms, config.getlong(producerconfig.retry_backoff_ms_config), this.transactionmanager, apiversions); string iothreadname = network_thread_prefix + " | " + clientid; this.iothread = new kafkathread(iothreadname, this.sender, true); this.iothread.start();
进入sender内部可以看到这个线程的作用就是一直轮询发送数据。
public void run() { log.debug("starting kafka producer i/o thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (exception e) { log.error("uncaught error in kafka producer i/o thread: ", e); } } log.debug("beginning shutdown of kafka producer i/o thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceclose && (this.accumulator.hasundrained() || this.client.inflightrequestcount() > 0)) { try { run(time.milliseconds()); } catch (exception e) { log.error("uncaught error in kafka producer i/o thread: ", e); } } if (forceclose) { // we need to fail all the incomplete batches and wake up the threads waiting on // the futures. log.debug("aborting incomplete batches due to forced shutdown"); this.accumulator.abortincompletebatches(); } try { this.client.close(); } catch (exception e) { log.error("failed to close network client", e); } log.debug("shutdown of kafka producer i/o thread has completed."); } /** * run a single iteration of sending * * @param now the current posix time in milliseconds */ void run(long now) { if (transactionmanager != null) { try { if (transactionmanager.shouldresetproducerstateafterresolvingsequences()) // check if the previous run expired batches which requires a reset of the producer state. transactionmanager.resetproducerid(); if (!transactionmanager.istransactional()) { // this is an idempotent producer, so make sure we have a producer id maybewaitforproducerid(); } else if (transactionmanager.hasunresolvedsequences() && !transactionmanager.hasfatalerror()) { transactionmanager.transitiontofatalerror(new kafkaexception("the client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. it isn't safe to continue.")); } else if (transactionmanager.hasinflighttransactionalrequest() || maybesendtransactionalrequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retrybackoffms, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionmanager.hasfatalerror() || !transactionmanager.hasproducerid()) { runtimeexception lasterror = transactionmanager.lasterror(); if (lasterror != null) maybeabortbatches(lasterror); client.poll(retrybackoffms, now); return; } else if (transactionmanager.hasabortableerror()) { accumulator.abortundrainedbatches(transactionmanager.lasterror()); } } catch (authenticationexception e) { // this is already logged as error, but propagated here to perform any clean ups. log.trace("authentication exception while processing transactional request: {}", e); transactionmanager.authenticationfailed(e); } } long polltimeout = sendproducerdata(now); client.poll(polltimeout, now); }
通过上面的分析我们可以看出producer.send操作本身其实是个基于内存的存储操作,耗时几乎可以忽略不计,但由于高并发情况下,线程同步会有一定的性能损耗,当然这个损耗在一般的应用场景下几乎是可以忽略不计的,但如果是数据量比较大,高并发的场景下会比较明显。
针对上面的问题分析,这里说下我个人的一些总结:
1、首先避免多线程操作producer发送数据,你可以采用生产者消费者模式把producer.send从你的多线程操作中解耦出来,维护一个你要发送的消息队列,单独开辟一个线程操作;
2、可能有的小伙伴会问,那么多创建几个producer的实例或者维护一个producer池可以吗,我原本也是这个想法,只是在测试中发现效果也不是很理想,我估计是由于创建producer实例过多,导致线程数量也跟着增加,本身的业务线程再加上kafka的线程,线程上下文切换比较频繁,cpu资源压力比较大,效率也不如单线程操作;
3、这个问题其实真是针对api操作来讲的,send操作并不是真正的数据发送,真正的数据发送由守护线程进行;按照kafka本身的设计思想,如果操作本身就成为了你性能的瓶颈,你应该考虑的是集群部署,负载均衡;
4、无锁才是真正的高性能;
上一篇: 女人和猫
下一篇: PHP中foreach()用法汇总