入门kafka
程序员文章站
2022-07-14 21:33:18
...
文章目录
一、安装kafka
1、安装环境组件
1.1、安装jdk
1.2、安装zookeeper
kafka使用zookeeper保存集群的元数据信息和消费者信息。kafka发行版自带了zookeeper,可以直接从脚本启动,也可以手动安装。
1.2.1、单机服务
- 下载稳定版本[链接](https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/) ,我这里下载的版本号3.5.6
以下操作sudo是为了防止没有管理员root权限。
解压zookeeper
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
移到系统用户目录下
sudo mv apache-zookeeper-3.5.6-bin /usr/local/zookeeper
创建数据保存目录
sudo mkdir -p /var/lib/zookeeper
修改zoo.cfg配置文件
cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
EOF
设置可执行Java环境如果自己安装则(假设安装目录为如下/usr/java/jdk文件夹名称)
export JAVA_HOME=/usr/java/jdk文件夹名称
如果是系统自己rpm安装的,则Java -version可以执行。则可以绕过设置JAVA_HOME目录步骤
sudo /usr/local/zookeeper/bin/zkServer.sh start
- 效果图
现在链接zookeeper端口,通过发送四字命令srvr来验证zookeeper是否安装正确。
假设没有安装telnet,则需要brew install telnet,没有brew的话 则需要安装下。
telnet localhost 2181
返回结果图:
1.2.2、zookeeper群组服务
zookeeper集群被称为群组,zookeeper使用的是一致性协议,所以建议每个群组里应该包含奇数个节点,只有当群组的大多数节点可用状态,zookeeper才能处理外部请求。如果群组包含5个节点,可以允许2个节点失效,如果包含3个节点,允许1个节点失效。
- 那么群组节点越多越好么?当然不是,节点越多在选举的时候性能越低。所以一般建议不超过7个节点。
-
群组需要公共配置,
- 1、每个服务器需要在数据目录中创建一个myid文件,用于指明自己的ID号。
- 2、假设群组里面的机器名字zoo1.example.com,zoo2.example.com,zoo3.example.com那么配置文件可能是这样的:
server.X=hostname:peerPort:leaderPort
X:代表服务器的ID,必须是整数,但是不一定从0开始也不要求是连续的。
hostname:服务器的机器名或者IP地址
peerPort:用于节点间通信的TCP端口
leaderPort:用于首领选举的TCP端口
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
客户端只需要通过clientPort就能连接到群组,但是群组节点间的通信需要同时用到3个端口(peerPort,leaderPort,clientPort)
1.3、安装kafka broker
在Java和zookeeper都安装好的基础上,可以安装kafka了。安装目录usr/local/kafka,将消息日志保存在/tmp/kafka-logs目录下。
下载地址:点击
解压文件
tar -zxvf kafka_2.12-2.3.1.tgz
移动到系统用户目录下
sudo mv kafka_2.12-2.3.1 /usr/local/kafka
创建存放日志区
mkdir /tmp/kafka-logs
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
创建主题wolf
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wolf
查看主题wolf
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic wolf
生产消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wolf
消费消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic wolf --from-beginning
注意事项zookeeper is not a recognized option
如果执行/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wolf --from-beginning
报错:zookeeper is not a recognized option
更改为/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic wolf --from-beginning
因为低版本的kafka的zookeeper命令行替换为bootstrap-server
效果图:
1.4、broker配置
1.4.1、常规配置
- broker.id(每个broker需要维护的标识符,不能重复,最好是根据机器的名字映射好)
- port(默认使用配置文件,会监听9092的端口,修改port参数可以修改为别的端口,如果使用1024以下的端口,需要使用root权限启动kafka。)
- zookeeper.connect(用于保存broker元数据的地址,比如本地的localhost:2181,最好是列表,防止单个broker故障带来的不可用服务)
- log.dirs(所有消息存储到磁盘是根据log.dirs指定的,它是一组用逗号分隔的本地文件系统路径,broker自我调优是根据最少使用原则,把同一个分区的日志片段保存到同一个路径下,要注意的是,broker往拥有最少的数目分区的路径新增分区,而不是往拥有最小的磁盘空间的路径新增分区)
- num.recovery.threads.per.data.dir(假设该数设置8,log.dir=3则一共需要24个线程用于恢复数据主要有三种情况:服务器正常启动,用于打开每个分区的日志片段,服务器奔溃后重启,用于检查和截断每个分区的日志片段,服务器正常关闭,用于关闭日志片段。)
- auto.create.topics.enable(默认情况下,自动创建主题的三种场景:当一个生产者开始往主题写入消息时候,或者当一个消费者开始从主题读取消息时候,或者当任意一个客户端向主题发送元数据请求的时候)
1.4.2、主题的默认配置
- num.partitions(默认启用,指定了新创建的主题包含多少个分区,我们可 以增加主题分区的个数,但是不能减少分区的个数。)
1.1、如何选定适合自己的分区数可以参考这几个因素:
1.1.1、主题需要达到多大的吞吐量,希望每秒写入100kb还是1GB?
1.1.2、从单个分区读取数据的最大吞吐量多少?消费者将数据写入数据库的速度不会超过50MB/秒,从分区的读取速度也就不会超过50MB/秒
1.1.3、可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多。所以最好为生产者多估算一些吞吐量
1.1.4、每个broker包含的分区个数,可用的磁盘空间和网络带宽
1.1.5、如果消息是按照不同的键来写入分区,那么为已有的主题新增分区会很困难
1.1.6、单个broker对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间越长。
假设根据以上数据,每秒钟要从主题写入和读取1GB的数据,并且每个消费者每秒钟可以处理50MB的数据,那么至少需要20个分区。如果对于这些不能估算,可以分区的大小限制在25GB以内(经验值) - log.retention.ms
kafka通常根据时间来决定数据可以被保留多久,默认使用log.retention.hours参数来配置时间,默认值168小时。除此之外,还有其他两个参数log.retention.minutes和log.retention.ms。如果指定了多个时间参数,系统会优先选择最小值的那个参数。
根据时间保留数据和最后修改时间:
通过检查磁盘上日志片段文件的最后修改时间来实现的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里最后一个消息的时间戳。 - log.retention.bytes
3.1、另一种方式通过保留的消息字节数来判断消息是否过期。它的值通过参数log.retention.bytes来指定,作用在每一个分区上。eg:假设8个分区,并且log.retention.bytes被设为1GB。那么这个主题最多可以保留8GB的数据。
3.2、 log.retention.bytes和log.retention.ms同时指定
根据字节大小和时间保留数据,同时指定 log.retention.bytes和log.retention.bytes那么满足其中一个条件则消息就会被删除。 - log.segment.bytes
当日志片段大小达到log.segment.bytes指定的上限(默认1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。这个参数越小,就会越频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
使用时间戳获取偏移量:
日志片段的大小会影响使用时间戳获取偏移量。日志片段越小,结果越准确。 - log.segment.ms
指定多长时间之后日志片段会被关闭。与log.segment.bytes一起合起来使用,也是先满足哪个然后哪个配置生效 - message.max.bytes
broker通过设置该参数来限制单个消息的大小。默认值是1000 000也就是1MB。这个值是压缩后的消息体值,对性能有显著影响。
在服务端和客户端之间协调消息大小的配置:
消费者客户端可以通过设置fetch.message.max.bytes必须与服务器端设置的消息大小进行协调。如果这个值比message.max.bytes小,那么消费者就会出现阻塞。