欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Cloud(7)

程序员文章站 2022-03-27 07:58:10
续 使用Kafka添加问题到ES安装Kafka步骤1:苍老师网站去下载kafka2.4.1解压到一个指定目录步骤2:配置Zookeeper在kafka安装目录下打开kafka_2.13-2.4.1\config\zookeeper.properties文件修改如下# dataDir=/tmp/zookeeperdataDir=F:/opt/kafka/zookeeper步骤3:配置kafka配置文件指定刚刚设置的zookeeper路径在kafka安装目录下打开kafka_2.13...

续 使用Kafka添加问题到ES

安装Kafka

步骤1:

苍老师网站去下载kafka2.4.1

解压到一个指定目录

步骤2:

配置Zookeeper

在kafka安装目录下打开kafka_2.13-2.4.1\config\zookeeper.properties文件

修改如下

# dataDir=/tmp/zookeeper
dataDir=F:/opt/kafka/zookeeper

步骤3:

配置kafka配置文件指定刚刚设置的zookeeper路径

在kafka安装目录下打开kafka_2.13-2.4.1\config\server.properties文件

将log配置中log.dirs修改为

log.dirs=F:/opt/kafka/zookeeper

步骤4:

启动kafka

win+R 输入Cmd打开Dos界面

进入kafka_2.13-2.4.1\bin\windows目录

输入如下命令

zookeeper-server-start.bat ..\..\config\zookeeper.properties
kafka-server-start.bat ..\..\config\server.properties

输入之后即可以启动kafka

使用SpringBoot测试kafka

在创建一个子项目straw-kafka这个项目只用于测试kafka

SpringBoot也提供了支持kafka的依赖

使用起来相对简单

步骤1

子项目创建完毕之后父子相认

步骤2

添加spring-kafka的依赖

由于测试过程中需要将对象转换成json格式传输到kafka

所以还要导入gson

<!-- Google JSON API -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

步骤3:

添加配置文件

application.properties

server.port=8082

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw

logging.level.cn.tedu.straw.kafka=debug

步骤4:

配置主方法类

@SpringBootApplication
@EnableKafka
@EnableScheduling
public class StrawKafkaApplication {


    public static void main(String[] args) {
        SpringApplication.run(StrawKafkaApplication.class, args);
    }

}

步骤5:

创建传递消息的实体类

@Data
@Accessors(chain = true)
public class DemoMessage implements Serializable {

    private String content;
    private Integer id;
    private Long time;
}

步骤6

编写消息的生产者(发送者)

代码如下

@Component
@Slf4j
public class DemoProdcucer {

    //我们依赖了Spring-kafka所以可以直接获得发送信息的对象
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    //我们需要将java对象转换为json格式
    private Gson gson=new Gson();
    //kafka的发送者设置每10秒发送一次信息
    @Scheduled(fixedRate = 1000*10)
    public void sendMessage(){
        DemoMessage message=new DemoMessage()
                .setContent("你好外星人!")
                .setId(1000)
                .setTime(System.currentTimeMillis());
        //将message对象转换成json格式
        String json=gson.toJson(message);
        log.debug("发送信息:{}",json);

        //向kafka发送消息,需要制定话题(Topic)
        kafkaTemplate.send("MyTopic",json);

    }
}

步骤7:

编写消息的消费者

代码如下

@Component
@Slf4j
public class DemoConsumer {

    //消息的接收者需要将消息转成java格式
    private Gson gson=new Gson();

    //下面是kafka接收消息的方法
    //需要指定接收消息的话题
    @KafkaListener(topics = "MyTopic")
    //上面的kafka监听器监听的话题只要有新的信息出现,就会自动运行这个方法
    //下面这个方法的参数是监听器传过来的,类型时固定的
    //他代表从kafka中获得的一条消息记录
    public void receive(ConsumerRecord<String,String> record){
        //将接收到的记录的value取出
        String json=record.value();
        log.debug("接收信息:{}",json);
        //将获得到的json个数字符串转回为java对象
        DemoMessage message=gson.fromJson(json,DemoMessage.class);
        log.debug("java对象:{}",message);

    }
}

使用Kafka重构稻草问答项目

Spring Cloud(7)

faq向kafka发送数据

首先我们要确定相同的Topic名称

最稳妥的办法是在commons模块中添加一个常量

所以转到commons

步骤1

在vo包中创建一个Topic类,类中添加常量

代码如下

public class Topic {

    //定义kafka实现问题传输的常量
    public static  final String QUESTIONS="search.questions";

}

步骤2:

转到faq模块

配置pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

步骤3:

faq的application.properties文件

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw

步骤4

主方法添加启动kafka注解

@SpringBootApplication
@EnableEurekaClient
@MapperScan("cn.tedu.straw.faq.mapper")
@EnableRedisHttpSession
@EnableKafka
public class StrawFaqApplication {

    public static void main(String[] args) {
        SpringApplication.run(StrawFaqApplication.class, args);
    }

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }
}

步骤5:

创建一个类来发送kafka信息

这里新建一个包kafka包中创建这个类

代码如下

@Component
@Slf4j
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    private Gson gson=new Gson();

    public void sendQuestion(Question question){
        log.debug("准备发送问题:{}"+question);
        //将问题转换为json格式
        String json=gson.toJson(question);
        log.debug("开始发送{}"+json);
        kafkaTemplate.send(Topic.QUESTIONS,json);
        log.debug("发送完成!");
    }
}

步骤6:

在业务逻辑层保存问题后,将问题发送给kafka

代码如下QuestionServiceImpl

@Resource
private KafkaProducer kafkaProducer;
@Override
@Transactional 
public void saveQuestion(String username, QuestionVo questionVo) {
    //此处省略将问题保存到数据库的过程 ...
    
    //将数据发送到straw-search模块
    kafkaProducer.sendQuestion(question);
}

search消费(接收)Kafka的数据

转到search模块

faq模块的发送已经完成,下面要开始编写search模块接收数据的代码

search配置三板斧

步骤1:

pom.xml

<!-- Google JSON API -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
        <!-- Kafka API -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

步骤2:

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw

步骤3:

主方法类

@SpringBootApplication
@EnableEurekaClient
@EnableRedisHttpSession
@EnableKafka
public class StrawSearchApplication {


    public static void main(String[] args) {
        SpringApplication.run(StrawSearchApplication.class, args);
    }

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }

}

步骤4:

开始开发search新增一个ES文档的业务逻辑层

先接口

IQuestionService中添加方法如下

//新增QuestionVo对象到ES的方法
    void saveQuestion(QuestionVo questionVo);

步骤5:

QuestionServiceImpl实现

@Override
    public void saveQuestion(QuestionVo questionVo) {
        questionRepository.save(questionVo);
    }

步骤6:

我们不使用控制器Controller来调用kafka的内容

而是使用一个Kafka的监听来时刻关注指定的话题有没有消息

如果出现消息那么立即运行指定的方法

创建一个kafka包

在包中创建类KafkaConsumer编写代码如下:

@Component
@Slf4j
public class KafkaConsumer {

    private Gson gson=new Gson();

    @Resource
    private IQuestionService questionService;

    @KafkaListener(topics = Topic.QUESTIONS)
    public void receiveQuestion(ConsumerRecord<String,String> record) {
        String json = record.value();
        QuestionVo questionVo = gson.fromJson(json, QuestionVo.class);
        log.debug("收到问题{},开始新增", questionVo);
        questionService.saveQuestion(questionVo);
    }
}

启动全部服务,登录一个学生

发布一个问题

检查数据库是否增加,并按这个问题题目或内容可能出现的分词进行查询

查询出来就表示ES中也添加成功了

稻草问答到此完结散花!!!

本文地址:https://blog.csdn.net/Da_Xia1/article/details/111997217

相关标签: Spring Cloud java