欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot Kafka

程序员文章站 2022-04-28 23:06:43
1、创建集群 http://kafka.apache.org/documentation/#quickstart 有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one. 1.1、命令行操作 1.2、图形化界面操作 除 ......

1、创建集群

http://kafka.apache.org/documentation/#quickstart

有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one.

1.1、命令行操作

#解压文件
tar -zxf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

#启动Zookeerper
bin/zookeeper-server-start.sh config/zookeeper.properties

#启动Kafka
bin/kafka-server-start.sh config/server.properties &
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

#创建集群
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTopic

#查看主题
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic

Spring Boot Kafka

1.2、图形化界面操作

除了命令行以为,也可以通过kafka-manager查看

Spring Boot Kafka

Spring Boot Kafka

Spring Boot Kafka

Spring Boot Kafka

Spring Boot Kafka

2、Spring Boot集成Kafka

2.1、引入Maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2、配置

spring:
  kafka:
    bootstrap-servers: 10.123.52.76:9092,10.123.52.76:9093,10.123.52.76:9094
    consumer:
      group-id: myGroup

2.3、收发消息

package com.cjs.boot.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyListener {

    @KafkaListener(topics = "myTopic")
    public void processMessage2(String content) {
        log.info("【Received Message From 'myTopic'】: {}", content);
    }

}
package com.cjs.boot.controller;

import com.cjs.boot.response.RespResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Controller;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.ModelAndView;

@Controller
@RequestMapping("/message")
public class MessageController extends BaseController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    @GetMapping("/add.html")
    public ModelAndView add() {
        return new ModelAndView("message/add");
    }

    @PostMapping("/send.json")
    @ResponseBody
    public RespResult send(String text) {
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("myTopic", String.valueOf(System.currentTimeMillis()), text);
        return RespResult.success();
    }

}
2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e2018-05-04 12:36:59.830  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 大家好啊
2018-05-04 12:37:24.107  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 吃饭啦

2.4、截图

Spring Boot Kafka