欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka分布式集群搭建

程序员文章站 2022-06-30 08:39:29
一、版本 CentOS 7.5 zookeeper-3.4.12 kafka _2.12-1.1.0 二、zookeeper安装 1、下载解压zookeeper压缩包 tar -zvxf zookeeper-3.4.12.tar.gz 2、创建数据与日志文件夹 mkdir /usr/local/zo ......

一、版本

centos 7.5

zookeeper-3.4.12

kafka _2.12-1.1.0

二、zookeeper安装

1、下载解压zookeeper压缩包

 
tar -zvxf zookeeper-3.4.12.tar.gz
 

2、创建数据与日志文件夹

mkdir /usr/local/zookeeper-3.4.12/data
mkdir /usr/local/zookeeper-3.4.12/logs

3、复制配置文件

进入conf目录,复制zoo_sample.cfg

cp zoo_sample.cfg zoo.cfg

4、进入data目录,执行命令

echo 1 > myid
创建myid文件并输入值为1,依次在另外两台机器上执行同样的操作,myid的数值依次为2,3配置成功;

5、修改配置文件

# the number of milliseconds of each tick
ticktime=2000
# the number of ticks that the initial
# synchronization phase can take
initlimit=10
# the number of ticks that can pass between
# sending a request and getting an acknowledgement
synclimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
datadir=/usr/local/zookeeper-3.4.12/data
datalogdir=/usr/local/zookeeper-3.4.12/logs
# the port at which the clients will connect
clientport=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxclientcnxns=60
#
# be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperadmin.html#sc_maintenance
#
# the number of snapshots to retain in datadir
#autopurge.snapretaincount=3
# purge task interval in hours
# set to "0" to disable auto purge feature
#autopurge.purgeinterval=1
#集群服务器地址
server.1=ip1:2888:3888
server.2=ip2:2888:3888
server.3=ip3:2888:3888

6、启动zookeeper

sh zkserver.sh start

三 、kafka安装

1、下载并解压kafka压缩包

tar -zvxf  kafka_2.12-1.1.0.tgz

2、修改配置文件

打开kafka配置文件

vim server.properties

修改相关配置

# 服务器id,设置为唯一数字。三台服务器可分别设置为1、2、3
broker.id=1

#监听地址
advertised.listeners=plaintext://ip地址:9092

# maps listener names to security protocols, the default is for them to be the same. see the config documentation for more details
#listener.security.protocol.map=plaintext:plaintext,ssl:ssl,sasl_plaintext:sasl_plaintext,sasl_ssl:sasl_ssl

#kafka网络通信的线程数
num.network.threads=3

#kafka io操作的线程数
num.io.threads=8

# the send buffer (so_sndbuf) used by the socket server
socket.send.buffer.bytes=102400

# the receive buffer (so_rcvbuf) used by the socket server
socket.receive.buffer.bytes=102400

# the maximum size of a request that the socket server will accept (protection against oom)
socket.request.max.bytes=104857600


#数据存储路径
log.dirs=/tmp/kafka-logs

#默认partition的数量
num.partitions=1

# the number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# this value is recommended to be increased for installations with data dirs located in raid array.
num.recovery.threads.per.data.dir=1

#集群状态下,为保证可用性,需要设置为大于1,这里设置为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

############################# log flush policy #############################


#日志保存时长
log.retention.hours=168

# a size-based retention policy for logs. segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# the maximum size of a log segment file. when this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# the interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# zookeeper #############################

# zookeeper connection string (see zookeeper docs for details).
# this is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# you can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# group coordinator settings #############################

# the following configuration specifies the time, in milliseconds, that the groupcoordinator will delay the initial consumer rebalance.
# the rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# the default value for this is 3 seconds.
# we override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# however, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
#group.initial.rebalance.delay.ms=0

# 是否自动创建主题 flase 否  true 是
auto.create.topics.enable=false

## 允许删除主题,默认是false
delete.topic.enable=true

3、启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

四、相关参数

broker 服务端 配置

message.max.bytes ( 默认:1m) – broker能接收消息的最大字节数,该值应该大于等于生产者的max.request.size,小于等于消费者的fetch.message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。
log.segment.bytes (默认:1gb) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1g,因为这是一个消息系统,而不是文件系统)。
replica.fetch.max.bytes (默认::1mb) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

consumer 消费者 配置

fetch.message.max.bytes (默认 1mb) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

producer 生产者 配置

buffer.memory(默认:32m)– 生产者缓冲区大小设置,如果缓冲区足够大,生产者可以一直写入,但并不代表消息被真正send;

batch.size(默认:16384 byte)– 每个数据包的大小设置,数据包达到指定大小后就可以被发送了,缓冲区内会存在多个数据包;

linger.ms –  如果数据包大小一直没有达到batch.size,设置最多等待多久,消息会发送出去;

max.request.size(默认:1m) 生产者一次发送数据的最大大小,它的值要大于batch.size 的大小

五、注意事项

1、为保证所有分区可用,offsets.topic.replication.factor至少配置为3;

2、关闭自动创建主题,同时尽量保证集群所有broker启动后,再开始客户端消费,否则无法保证partition及其副本均匀分布,影响高可用;

3、集群启动后,可通过命令查看分区及其副本分布情况;

bin/kafka-topics.sh --describe --zookeeper localhost:2182 --topic __consumer_offsets

 

 

关注微信公众号,查看更多技术文章。

 

kafka分布式集群搭建