kafka分布式集群搭建
一、版本
centos 7.5
zookeeper-3.4.12
二、zookeeper安装
1、下载解压zookeeper压缩包
2、创建数据与日志文件夹
mkdir /usr/local/zookeeper-3.4.12/data mkdir /usr/local/zookeeper-3.4.12/logs
3、复制配置文件
进入conf目录,复制zoo_sample.cfg
cp zoo_sample.cfg zoo.cfg
4、进入data目录,执行命令
echo 1 > myid
5、修改配置文件
# the number of milliseconds of each tick ticktime=2000 # the number of ticks that the initial # synchronization phase can take initlimit=10 # the number of ticks that can pass between # sending a request and getting an acknowledgement synclimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. datadir=/usr/local/zookeeper-3.4.12/data datalogdir=/usr/local/zookeeper-3.4.12/logs # the port at which the clients will connect clientport=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxclientcnxns=60 # # be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperadmin.html#sc_maintenance # # the number of snapshots to retain in datadir #autopurge.snapretaincount=3 # purge task interval in hours # set to "0" to disable auto purge feature #autopurge.purgeinterval=1 #集群服务器地址 server.1=ip1:2888:3888 server.2=ip2:2888:3888 server.3=ip3:2888:3888
6、启动zookeeper
三 、kafka安装
1、下载并解压kafka压缩包
tar -zvxf kafka_2.12-1.1.0.tgz
2、修改配置文件
打开kafka配置文件
vim server.properties
修改相关配置
# 服务器id,设置为唯一数字。三台服务器可分别设置为1、2、3 broker.id=1 #监听地址 advertised.listeners=plaintext://ip地址:9092 # maps listener names to security protocols, the default is for them to be the same. see the config documentation for more details #listener.security.protocol.map=plaintext:plaintext,ssl:ssl,sasl_plaintext:sasl_plaintext,sasl_ssl:sasl_ssl #kafka网络通信的线程数 num.network.threads=3 #kafka io操作的线程数 num.io.threads=8 # the send buffer (so_sndbuf) used by the socket server socket.send.buffer.bytes=102400 # the receive buffer (so_rcvbuf) used by the socket server socket.receive.buffer.bytes=102400 # the maximum size of a request that the socket server will accept (protection against oom) socket.request.max.bytes=104857600 #数据存储路径 log.dirs=/tmp/kafka-logs #默认partition的数量 num.partitions=1 # the number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # this value is recommended to be increased for installations with data dirs located in raid array. num.recovery.threads.per.data.dir=1 #集群状态下,为保证可用性,需要设置为大于1,这里设置为3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 ############################# log flush policy ############################# #日志保存时长 log.retention.hours=168 # a size-based retention policy for logs. segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. functions independently of log.retention.hours. #log.retention.bytes=1073741824 # the maximum size of a log segment file. when this size is reached a new log segment will be created. log.segment.bytes=1073741824 # the interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# zookeeper ############################# # zookeeper connection string (see zookeeper docs for details). # this is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # you can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# group coordinator settings ############################# # the following configuration specifies the time, in milliseconds, that the groupcoordinator will delay the initial consumer rebalance. # the rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # the default value for this is 3 seconds. # we override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # however, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. #group.initial.rebalance.delay.ms=0 # 是否自动创建主题 flase 否 true 是 auto.create.topics.enable=false ## 允许删除主题,默认是false delete.topic.enable=true
3、启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
四、相关参数
broker 服务端 配置
message.max.bytes ( 默认:1m) – broker能接收消息的最大字节数,该值应该大于等于生产者的max.request.size,小于等于消费者的fetch.message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。
log.segment.bytes (默认:1gb) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1g,因为这是一个消息系统,而不是文件系统)。
replica.fetch.max.bytes (默认::1mb) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
consumer 消费者 配置
fetch.message.max.bytes (默认 1mb) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
producer 生产者 配置
buffer.memory(默认:32m)– 生产者缓冲区大小设置,如果缓冲区足够大,生产者可以一直写入,但并不代表消息被真正send;
batch.size(默认:16384 byte)– 每个数据包的大小设置,数据包达到指定大小后就可以被发送了,缓冲区内会存在多个数据包;
linger.ms – 如果数据包大小一直没有达到batch.size,设置最多等待多久,消息会发送出去;
max.request.size(默认:1m)– 生产者一次发送数据的最大大小,它的值要大于batch.size 的大小
五、注意事项
1、为保证所有分区可用,offsets.topic.replication.factor至少配置为3;
2、关闭自动创建主题,同时尽量保证集群所有broker启动后,再开始客户端消费,否则无法保证partition及其副本均匀分布,影响高可用;
3、集群启动后,可通过命令查看分区及其副本分布情况;
bin/kafka-topics.sh --describe --zookeeper localhost:2182 --topic __consumer_offsets
关注微信公众号,查看更多技术文章。