欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件kafka

程序员文章站 2024-01-30 15:39:22
...

(1)安装ZooKeeper

① 下载:http://zookeeper.apache.org/releases.html#download
② 解压并进入ZooKeeper目录,如:D:\Kafka\zookeeper-3\conf
③ 将“zoo_sample.cfg”重命名为“zoo.cfg”
④ 打开“zoo.cfg”找到并编辑dataDir=D:\Kafka\zookeeper-3\tmp
⑤ 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3
⑥ 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
⑦ 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
⑧ 打开新的cmd,输入“zkServer“,运行Zookeeper
⑨ 命令行提示如下:说明本地Zookeeper启动成功
最底下出现(0.0.0.0:2181)说明成功启动!!!记得不要关这窗口!!!

(2)安装Kafka

① 下载:http://kafka.apache.org/downloads
② 解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.12
③ 进入config目录找到文件server.properties并打开
④ 找到并编辑log.dirs=D:\Kafka\kafka_2.12\kafka-logs
⑤ 找到并编辑zookeeper.connect=localhost:2181
⑥ Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认 端口:2181
⑦ 进入Kafka安装目录D:\Kafka\kafka_2.12,按下Shift+右键,选择“打 开命令窗口”选项,打开命令行.
⑧ 输: .\bin\windows\kafka-server-start.bat .\config\server.pro perties
成功启动!!!记得不要关这窗口!!!

(3)测试

① .创建主题,进入Kafka安装目录按下Shift+右键,选择“打开命令窗口”选项,打开命令行
② 输:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic linlin001
③ 成功创建!!!记得不要关这窗口!!!
④ 查看主题输入:.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
⑤ 创建生产者
⑥ .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic linlin001
⑦ 成功创建!!!记得不要关这窗口!!!
⑧ 创建消费者
⑨ .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic linlin001 --from-beginning
成功创建!!!记得不要关这窗口!!!(最后在生产者窗口写内容在消费者窗口能看到就说明大功告成!!!)

(4)Springboot中配置kafka

①引入依赖:

<!-- 消息中间件kafka依赖 -->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

②在yml配置文件内配置

	#kafka地址 brokers集群地址用,隔开
  kafka: 
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    #每批次发送消息的数量
    producer: 
      batch-size: 16
      #发送失败重试次数
      retries: 0
      #即32MB的批处理缓冲区
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    #消费者的配置
    ##Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
    consumer:
      auto-offset-reset: latest
      #是否开启自动提交
      enable-auto-commit: true
      #自动提交的时间间隔
      auto-commit-interval: 100
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在kafka/config文件的consumer.properties中有配置
      group-id: test-consumer-group
相关标签: Java后端-中间件