kafka生产实践(详解)
1.引言
最近接触到一个app流量分析的项目,类似于友盟。涉及到几个c端(客户端)高并发的接口,这几个接口主要用于c端数据的提交。在没有任何缓冲的情况下,一个接口涉及到5张表的提交。压测的结果很不理想,主要瓶颈就在与rds的交互。
一台双核,16g机子,单实例,jdbc最大连接数100,吞吐量竟然只有50tps。
能想到的改造方案就是引入一层缓冲,让c端接口不与rds直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,引入了kafka,kafka是一种高吞吐量的分布式发布订阅消息系统,起初在不做任何kafka优化的时候,简单地将c端提交的数据直接send到单节点kafka,就这样,我们的吞吐量达到了100tps.还是有点小惊喜的。
最近一段时间研究了一下kafka,对一些参数进行调整,目前接口的吞吐量已经达到220tps,写这篇文章主要想记录一下自己优化和部署经历。
2.kafka简介
kafka的结构图
这张图很好的诠释了kafka的结构,但是遗漏了一点,就是group的概念,我这里补充一下,一个组可以包含多个consumer对多个topic进行消费,但是不同组的消费都是独立的。
也就是说同一个topic的同一条消息可以被不同组的consumer消费。
我这里的主要的优化途径就是将kafka集群化,多partition化,使其并发度更高。
集群化都很好理解,那什么是多partition?
partition是topic的一个概念,即对topic进行分组,不同partition之间的消费相互独立,并且有序。并且一个partiton只能被一个消费者消费,所以咯,假如topic只有一个partition的话,那么消费者实例不能大于一个,那实例再多也没用,受限于kafka的partition。
上面都是讲消费,其实send操作也是一样的,要保证有序必然要等上一个发送ack之后,下一个发送才能进行,如果只有一个partition,那send之后的ack的等待时间必然会阻塞下面一次send,设计多个partition之后,可以同时往多个partition发送消息,自然吞吐量也就上去。
3.kafka集群的搭建以及参数配置
集群搭建
准备两台机子,然后去官网(http://kafka.apache.org/downloads)下载一个包。通过scp到服务器上,解压进入config目录,编辑server.config.
第一台机子配置(172.18.240.36):
broker.id=0 每台服务器的broker.id都不能相同 #hostname host.name=172.18.240.36 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的连接端口 zookeeper.connect=172.18.240.36:4001 #默认partition数 num.partitions=2
第二台机子配置(172.18.240.62):
broker.id=1 每台服务器的broker.id都不能相同
#hostname host.name=172.18.240.62 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #设置zookeeper的连接端口 zookeeper.connect=172.18.240.36:4001 #默认partition数 num.partitions=2
新增或者修改成以上配置。
对了,在此之前请先安装zookeeper,如果你用的是zookeeper集群的话,zookeeper.connect可以填写多个,中间用逗号隔开。
然后启动
nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
测试一下:
在第一台机子上开启一个producer
./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test
在第二台机子上开启一个consumer
./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning
第一台机子发送一条消息
第二台机子立马收到消息
这样kafka的集群部署就完成了。就下来我们来看看,java的客户端代码如何编写。
4.kafka客户端代码示例
我这里的工程是建立在spring boot 之下的,仅供参考。
在 application.yml下添加如下配置:
kafka: consumer: default: server: 172.18.240.36:9092,172.18.240.62:9092 enableautocommit: false autocommitintervalms: 100 sessiontimeoutms: 15000 groupid: data_analysis_group autooffsetreset: latest producer: default: server: 172.18.240.36:9092,172.18.240.62:9092 retries: 0 batchsize: 4096 lingerms: 1 buffermemory: 40960
添加两个配置类
package com.dtdream.analysis.config; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.common.serialization.stringdeserializer; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory; import org.springframework.kafka.config.kafkalistenercontainerfactory; import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; import org.springframework.kafka.listener.concurrentmessagelistenercontainer; import org.springframework.kafka.listener.adapter.recordfilterstrategy; import java.util.hashmap; import java.util.map; @configurationproperties( prefix = "kafka.consumer.default" ) @enablekafka @configuration public class kafkaconsumerconfig { private static final logger log = loggerfactory.getlogger(kafkaconsumerconfig.class); private static string autocommitintervalms; private static string sessiontimeoutms; private static class keydeserializerclass = stringdeserializer.class; private static class valuedeserializerclass = stringdeserializer.class; private static string groupid = "test-group"; private static string autooffsetreset = "latest"; private static string server; private static boolean enableautocommit; public static string getserver() { return server; } public static void setserver(string server) { kafkaconsumerconfig.server = server; } public static boolean isenableautocommit() { return enableautocommit; } public static void setenableautocommit(boolean enableautocommit) { kafkaconsumerconfig.enableautocommit = enableautocommit; } public static string getautocommitintervalms() { return autocommitintervalms; } public static void setautocommitintervalms(string autocommitintervalms) { kafkaconsumerconfig.autocommitintervalms = autocommitintervalms; } public static string getsessiontimeoutms() { return sessiontimeoutms; } public static void setsessiontimeoutms(string sessiontimeoutms) { kafkaconsumerconfig.sessiontimeoutms = sessiontimeoutms; } public static class getkeydeserializerclass() { return keydeserializerclass; } public static void setkeydeserializerclass(class keydeserializerclass) { kafkaconsumerconfig.keydeserializerclass = keydeserializerclass; } public static class getvaluedeserializerclass() { return valuedeserializerclass; } public static void setvaluedeserializerclass(class valuedeserializerclass) { kafkaconsumerconfig.valuedeserializerclass = valuedeserializerclass; } public static string getgroupid() { return groupid; } public static void setgroupid(string groupid) { kafkaconsumerconfig.groupid = groupid; } public static string getautooffsetreset() { return autooffsetreset; } public static void setautooffsetreset(string autooffsetreset) { kafkaconsumerconfig.autooffsetreset = autooffsetreset; } @bean public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() { concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>(); factory.setconsumerfactory(consumerfactory()); factory.setconcurrency(10); factory.getcontainerproperties().setpolltimeout(3000); factory.setrecordfilterstrategy(new recordfilterstrategy<string, string>() { @override public boolean filter(consumerrecord<string, string> consumerrecord) { log.debug("partition is {},key is {},topic is {}", consumerrecord.partition(), consumerrecord.key(), consumerrecord.topic()); return false; } }); return factory; } private consumerfactory<string, string> consumerfactory() { return new defaultkafkaconsumerfactory<>(consumerconfigs()); } private map<string, object> consumerconfigs() { map<string, object> propsmap = new hashmap<>(); propsmap.put(consumerconfig.bootstrap_servers_config, server); propsmap.put(consumerconfig.enable_auto_commit_config, enableautocommit); propsmap.put(consumerconfig.auto_commit_interval_ms_config, autocommitintervalms); propsmap.put(consumerconfig.session_timeout_ms_config, sessiontimeoutms); propsmap.put(consumerconfig.key_deserializer_class_config, keydeserializerclass); propsmap.put(consumerconfig.value_deserializer_class_config, valuedeserializerclass); propsmap.put(consumerconfig.group_id_config, groupid); propsmap.put(consumerconfig.auto_offset_reset_config, autooffsetreset); return propsmap; } /* @bean public listener listener() { return new listener(); }*/ }
package com.dtdream.analysis.config; import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.common.serialization.stringserializer; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.producerfactory; import java.util.hashmap; import java.util.map; /** * created with intellij idea. * user: chenqimiao * date: 2017/7/24 * time: 9:43 * to change this template use file | settings | file templates. */ @configurationproperties( prefix = "kafka.producer.default", ignoreinvalidfields = true )//注入一些属性域 @enablekafka @configuration//使得@bean注解生效 public class kafkaproducerconfig { private static string server; private static integer retries; private static integer batchsize; private static integer lingerms; private static integer buffermemory; private static class keyserializerclass = stringserializer.class; private static class valueserializerclass = stringserializer.class; private map<string, object> producerconfigs() { map<string, object> props = new hashmap<>(); props.put(producerconfig.bootstrap_servers_config, server); props.put(producerconfig.retries_config, retries); props.put(producerconfig.batch_size_config, batchsize); props.put(producerconfig.linger_ms_config, lingerms); props.put(producerconfig.buffer_memory_config, buffermemory); props.put(producerconfig.key_serializer_class_config, keyserializerclass); props.put(producerconfig.value_serializer_class_config, valueserializerclass); return props; } private producerfactory<string, string> producerfactory() { return new defaultkafkaproducerfactory<>(producerconfigs()); } public static string getserver() { return server; } public static void setserver(string server) { kafkaproducerconfig.server = server; } public static integer getretries() { return retries; } public static void setretries(integer retries) { kafkaproducerconfig.retries = retries; } public static integer getbatchsize() { return batchsize; } public static void setbatchsize(integer batchsize) { kafkaproducerconfig.batchsize = batchsize; } public static integer getlingerms() { return lingerms; } public static void setlingerms(integer lingerms) { kafkaproducerconfig.lingerms = lingerms; } public static integer getbuffermemory() { return buffermemory; } public static void setbuffermemory(integer buffermemory) { kafkaproducerconfig.buffermemory = buffermemory; } public static class getkeyserializerclass() { return keyserializerclass; } public static void setkeyserializerclass(class keyserializerclass) { kafkaproducerconfig.keyserializerclass = keyserializerclass; } public static class getvalueserializerclass() { return valueserializerclass; } public static void setvalueserializerclass(class valueserializerclass) { kafkaproducerconfig.valueserializerclass = valueserializerclass; } @bean(name = "kafkatemplate") public kafkatemplate<string, string> kafkatemplate() { return new kafkatemplate<>(producerfactory()); } }
利用kafkatemplate即可完成发送。
@autowired private kafkatemplate<string,string> kafkatemplate; @requestmapping( value = "/openapp", method = requestmethod.post, produces = mediatype.application_json_utf8_value, consumes = mediatype.application_json_utf8_value ) @responsebody public resultdto openapp(@requestbody activelogpushbo activelogpushbo, httpservletrequest request) { logger.info("openapp: activelogpushbo {}, datetime {}", jsonobject.tojsonstring(activelogpushbo),new datetime().tostring("yyyy-mm-dd hh:mm:ss.sss")); string ip = (string) request.getattribute("ip"); activelogpushbo.setip(ip); activelogpushbo.setdate(new date()); //resultdto resultdto = datacollectionservice.collectopeninfo(activelogpushbo); kafkatemplate.send("data_collection_open",jsonobject.tojsonstring(activelogpushbo)); // logger.info("openapp: resultdto {} ,datetime {}", resultdto.tojsonstring(),new datetime().tostring("yyyy-mm-dd hh:mm:ss.sss")); return new resultdto().success(); }
kafkatemplate的send方法会更根据你指定的key进行hash,再对partition数进行去模,最后决定发送到那一个分区,假如没有指定key,那send方法对分区的选择是随机。具体怎么随机的话,这里就不展开讲了,有兴趣的同学可以自己看源码,我们可以交流交流。
接着配置一个监听器
package com.dtdream.analysis.listener; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.kafka.annotation.kafkalistener; import java.util.optional; @component public class listener { private logger logger = loggerfactory.getlogger(this.getclass()); @kafkalistener(topics = {"test-topic"}) public void listen(consumerrecord<?, ?> record) { optional<?> kafkamessage = optional.ofnullable(record.value()); if (kafkamessage.ispresent()) { object message = kafkamessage.get(); logger.info("message is {} ", message); } } }
@kafkalistener其实可以具体指定消费哪个分区,如果不指定的话,并且只有一个消费者实例,那么这个实例会消费所有的分区的消息。
消费者的数量是一定要少于partition的数量的,不然没有任何意义。会出现消费者过剩的情况。
消费者数量和partition数量的多与少,会动态影响消费节点所消费的partition数目,最终会在整个集群中达到一种动态平衡。
5.总结
理论上只要cpu核心数无限,那么partition数也可以无上限,与此同时消费者节点和生产者节点也可以无上限,最终会使单个topic的并发无上限。单机的cpu的核心数总是会达到一个上限,kafka作为分布式系统,可以很好利用集群的运算能力,进行动态扩展,在dt时代,应该会慢慢成为主流吧。
以上这篇kafka生产实践(详解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。