欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

入门kafka

程序员文章站 2022-07-14 21:33:18
...

一、安装kafka

1、安装环境组件

1.1、安装jdk

1.2、安装zookeeper

kafka使用zookeeper保存集群的元数据信息和消费者信息。kafka发行版自带了zookeeper,可以直接从脚本启动,也可以手动安装。
入门kafka

1.2.1、单机服务
  - 下载稳定版本[链接](https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/) ,我这里下载的版本号3.5.6
以下操作sudo是为了防止没有管理员root权限。
解压zookeeper
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
移到系统用户目录下
sudo mv apache-zookeeper-3.5.6-bin /usr/local/zookeeper
创建数据保存目录
sudo mkdir -p /var/lib/zookeeper
修改zoo.cfg配置文件
cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
EOF
设置可执行Java环境如果自己安装则(假设安装目录为如下/usr/java/jdk文件夹名称)
export JAVA_HOME=/usr/java/jdk文件夹名称
如果是系统自己rpm安装的,则Java -version可以执行。则可以绕过设置JAVA_HOME目录步骤
sudo /usr/local/zookeeper/bin/zkServer.sh start
  - 效果图

入门kafka
现在链接zookeeper端口,通过发送四字命令srvr来验证zookeeper是否安装正确。

假设没有安装telnet,则需要brew install telnet,没有brew的话 则需要安装下。
telnet localhost 2181

返回结果图:入门kafka

1.2.2、zookeeper群组服务

zookeeper集群被称为群组,zookeeper使用的是一致性协议,所以建议每个群组里应该包含奇数个节点,只有当群组的大多数节点可用状态,zookeeper才能处理外部请求。如果群组包含5个节点,可以允许2个节点失效,如果包含3个节点,允许1个节点失效。

  • 那么群组节点越多越好么?当然不是,节点越多在选举的时候性能越低。所以一般建议不超过7个节点。
  • 群组需要公共配置,
    • 1、每个服务器需要在数据目录中创建一个myid文件,用于指明自己的ID号。
    • 2、假设群组里面的机器名字zoo1.example.com,zoo2.example.com,zoo3.example.com那么配置文件可能是这样的:
      server.X=hostname:peerPort:leaderPort
      X:代表服务器的ID,必须是整数,但是不一定从0开始也不要求是连续的。
      hostname:服务器的机器名或者IP地址
      peerPort:用于节点间通信的TCP端口
      leaderPort:用于首领选举的TCP端口
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888

客户端只需要通过clientPort就能连接到群组,但是群组节点间的通信需要同时用到3个端口(peerPort,leaderPort,clientPort)

1.3、安装kafka broker

在Java和zookeeper都安装好的基础上,可以安装kafka了。安装目录usr/local/kafka,将消息日志保存在/tmp/kafka-logs目录下。
入门kafka
下载地址:点击

解压文件
tar -zxvf kafka_2.12-2.3.1.tgz
移动到系统用户目录下
sudo mv kafka_2.12-2.3.1 /usr/local/kafka
创建存放日志区
mkdir /tmp/kafka-logs
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
创建主题wolf
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wolf
查看主题wolf
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic wolf
生产消息
 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wolf
 消费消息
 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic wolf --from-beginning
 

注意事项zookeeper is not a recognized option

如果执行/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wolf --from-beginning
报错:zookeeper is not a recognized option
更改为/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic wolf --from-beginning
因为低版本的kafka的zookeeper命令行替换为bootstrap-server

效果图:入门kafka

1.4、broker配置

1.4.1、常规配置
  1. broker.id(每个broker需要维护的标识符,不能重复,最好是根据机器的名字映射好)
  2. port(默认使用配置文件,会监听9092的端口,修改port参数可以修改为别的端口,如果使用1024以下的端口,需要使用root权限启动kafka。)
  3. zookeeper.connect(用于保存broker元数据的地址,比如本地的localhost:2181,最好是列表,防止单个broker故障带来的不可用服务)
  4. log.dirs(所有消息存储到磁盘是根据log.dirs指定的,它是一组用逗号分隔的本地文件系统路径,broker自我调优是根据最少使用原则,把同一个分区的日志片段保存到同一个路径下,要注意的是,broker往拥有最少的数目分区的路径新增分区,而不是往拥有最小的磁盘空间的路径新增分区)
  5. num.recovery.threads.per.data.dir(假设该数设置8,log.dir=3则一共需要24个线程用于恢复数据主要有三种情况:服务器正常启动,用于打开每个分区的日志片段,服务器奔溃后重启,用于检查和截断每个分区的日志片段,服务器正常关闭,用于关闭日志片段。)
  6. auto.create.topics.enable(默认情况下,自动创建主题的三种场景:当一个生产者开始往主题写入消息时候,或者当一个消费者开始从主题读取消息时候,或者当任意一个客户端向主题发送元数据请求的时候)
1.4.2、主题的默认配置
  1. num.partitions(默认启用,指定了新创建的主题包含多少个分区,我们可 以增加主题分区的个数,但是不能减少分区的个数。)
    1.1、如何选定适合自己的分区数可以参考这几个因素:
    1.1.1、主题需要达到多大的吞吐量,希望每秒写入100kb还是1GB?
    1.1.2、从单个分区读取数据的最大吞吐量多少?消费者将数据写入数据库的速度不会超过50MB/秒,从分区的读取速度也就不会超过50MB/秒
    1.1.3、可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多。所以最好为生产者多估算一些吞吐量
    1.1.4、每个broker包含的分区个数,可用的磁盘空间和网络带宽
    1.1.5、如果消息是按照不同的键来写入分区,那么为已有的主题新增分区会很困难
    1.1.6、单个broker对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间越长。
    假设根据以上数据,每秒钟要从主题写入和读取1GB的数据,并且每个消费者每秒钟可以处理50MB的数据,那么至少需要20个分区。如果对于这些不能估算,可以分区的大小限制在25GB以内(经验值)
  2. log.retention.ms
    kafka通常根据时间来决定数据可以被保留多久,默认使用log.retention.hours参数来配置时间,默认值168小时。除此之外,还有其他两个参数log.retention.minutes和log.retention.ms。如果指定了多个时间参数,系统会优先选择最小值的那个参数。
    根据时间保留数据和最后修改时间:
    通过检查磁盘上日志片段文件的最后修改时间来实现的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里最后一个消息的时间戳。
  3. log.retention.bytes
    3.1、另一种方式通过保留的消息字节数来判断消息是否过期。它的值通过参数log.retention.bytes来指定,作用在每一个分区上。eg:假设8个分区,并且log.retention.bytes被设为1GB。那么这个主题最多可以保留8GB的数据。
    3.2、 log.retention.bytes和log.retention.ms同时指定
    根据字节大小和时间保留数据,同时指定 log.retention.bytes和log.retention.bytes那么满足其中一个条件则消息就会被删除。
  4. log.segment.bytes
    当日志片段大小达到log.segment.bytes指定的上限(默认1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。这个参数越小,就会越频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
    使用时间戳获取偏移量:
    日志片段的大小会影响使用时间戳获取偏移量。日志片段越小,结果越准确。
  5. log.segment.ms
    指定多长时间之后日志片段会被关闭。与log.segment.bytes一起合起来使用,也是先满足哪个然后哪个配置生效
  6. message.max.bytes
    broker通过设置该参数来限制单个消息的大小。默认值是1000 000也就是1MB。这个值是压缩后的消息体值,对性能有显著影响。
    在服务端和客户端之间协调消息大小的配置:
    消费者客户端可以通过设置fetch.message.max.bytes必须与服务器端设置的消息大小进行协调。如果这个值比message.max.bytes小,那么消费者就会出现阻塞。

1.5、硬件的选择

1.5.1、磁盘吞吐量
1.5.2、磁盘容量
1.5.3、内存
1.5.4、网络
1.5.5、CPU

1.6、云端的Kafka

1.7、Kafka集群