Spring Cloud(7)
续 使用Kafka添加问题到ES
安装Kafka
步骤1:
苍老师网站去下载kafka2.4.1
解压到一个指定目录
步骤2:
配置Zookeeper
在kafka安装目录下打开kafka_2.13-2.4.1\config\zookeeper.properties文件
修改如下
# dataDir=/tmp/zookeeper
dataDir=F:/opt/kafka/zookeeper
步骤3:
配置kafka配置文件指定刚刚设置的zookeeper路径
在kafka安装目录下打开kafka_2.13-2.4.1\config\server.properties文件
将log配置中log.dirs修改为
log.dirs=F:/opt/kafka/zookeeper
步骤4:
启动kafka
win+R 输入Cmd打开Dos界面
进入kafka_2.13-2.4.1\bin\windows目录
输入如下命令
zookeeper-server-start.bat ..\..\config\zookeeper.properties
kafka-server-start.bat ..\..\config\server.properties
输入之后即可以启动kafka
使用SpringBoot测试kafka
在创建一个子项目straw-kafka这个项目只用于测试kafka
SpringBoot也提供了支持kafka的依赖
使用起来相对简单
步骤1
子项目创建完毕之后父子相认
步骤2
添加spring-kafka的依赖
由于测试过程中需要将对象转换成json格式传输到kafka
所以还要导入gson
<!-- Google JSON API -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
步骤3:
添加配置文件
application.properties
server.port=8082
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw
logging.level.cn.tedu.straw.kafka=debug
步骤4:
配置主方法类
@SpringBootApplication
@EnableKafka
@EnableScheduling
public class StrawKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(StrawKafkaApplication.class, args);
}
}
步骤5:
创建传递消息的实体类
@Data
@Accessors(chain = true)
public class DemoMessage implements Serializable {
private String content;
private Integer id;
private Long time;
}
步骤6
编写消息的生产者(发送者)
代码如下
@Component
@Slf4j
public class DemoProdcucer {
//我们依赖了Spring-kafka所以可以直接获得发送信息的对象
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
//我们需要将java对象转换为json格式
private Gson gson=new Gson();
//kafka的发送者设置每10秒发送一次信息
@Scheduled(fixedRate = 1000*10)
public void sendMessage(){
DemoMessage message=new DemoMessage()
.setContent("你好外星人!")
.setId(1000)
.setTime(System.currentTimeMillis());
//将message对象转换成json格式
String json=gson.toJson(message);
log.debug("发送信息:{}",json);
//向kafka发送消息,需要制定话题(Topic)
kafkaTemplate.send("MyTopic",json);
}
}
步骤7:
编写消息的消费者
代码如下
@Component
@Slf4j
public class DemoConsumer {
//消息的接收者需要将消息转成java格式
private Gson gson=new Gson();
//下面是kafka接收消息的方法
//需要指定接收消息的话题
@KafkaListener(topics = "MyTopic")
//上面的kafka监听器监听的话题只要有新的信息出现,就会自动运行这个方法
//下面这个方法的参数是监听器传过来的,类型时固定的
//他代表从kafka中获得的一条消息记录
public void receive(ConsumerRecord<String,String> record){
//将接收到的记录的value取出
String json=record.value();
log.debug("接收信息:{}",json);
//将获得到的json个数字符串转回为java对象
DemoMessage message=gson.fromJson(json,DemoMessage.class);
log.debug("java对象:{}",message);
}
}
使用Kafka重构稻草问答项目
faq向kafka发送数据
首先我们要确定相同的Topic名称
最稳妥的办法是在commons模块中添加一个常量
所以转到commons
步骤1
在vo包中创建一个Topic类,类中添加常量
代码如下
public class Topic {
//定义kafka实现问题传输的常量
public static final String QUESTIONS="search.questions";
}
步骤2:
转到faq模块
配置pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
步骤3:
faq的application.properties文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw
步骤4
主方法添加启动kafka注解
@SpringBootApplication
@EnableEurekaClient
@MapperScan("cn.tedu.straw.faq.mapper")
@EnableRedisHttpSession
@EnableKafka
public class StrawFaqApplication {
public static void main(String[] args) {
SpringApplication.run(StrawFaqApplication.class, args);
}
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
步骤5:
创建一个类来发送kafka信息
这里新建一个包kafka包中创建这个类
代码如下
@Component
@Slf4j
public class KafkaProducer {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
private Gson gson=new Gson();
public void sendQuestion(Question question){
log.debug("准备发送问题:{}"+question);
//将问题转换为json格式
String json=gson.toJson(question);
log.debug("开始发送{}"+json);
kafkaTemplate.send(Topic.QUESTIONS,json);
log.debug("发送完成!");
}
}
步骤6:
在业务逻辑层保存问题后,将问题发送给kafka
代码如下QuestionServiceImpl
@Resource
private KafkaProducer kafkaProducer;
@Override
@Transactional
public void saveQuestion(String username, QuestionVo questionVo) {
//此处省略将问题保存到数据库的过程 ...
//将数据发送到straw-search模块
kafkaProducer.sendQuestion(question);
}
search消费(接收)Kafka的数据
转到search模块
faq模块的发送已经完成,下面要开始编写search模块接收数据的代码
search配置三板斧
步骤1:
pom.xml
<!-- Google JSON API -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
步骤2:
application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=straw
步骤3:
主方法类
@SpringBootApplication
@EnableEurekaClient
@EnableRedisHttpSession
@EnableKafka
public class StrawSearchApplication {
public static void main(String[] args) {
SpringApplication.run(StrawSearchApplication.class, args);
}
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
步骤4:
开始开发search新增一个ES文档的业务逻辑层
先接口
IQuestionService中添加方法如下
//新增QuestionVo对象到ES的方法
void saveQuestion(QuestionVo questionVo);
步骤5:
QuestionServiceImpl实现
@Override
public void saveQuestion(QuestionVo questionVo) {
questionRepository.save(questionVo);
}
步骤6:
我们不使用控制器Controller来调用kafka的内容
而是使用一个Kafka的监听来时刻关注指定的话题有没有消息
如果出现消息那么立即运行指定的方法
创建一个kafka包
在包中创建类KafkaConsumer编写代码如下:
@Component
@Slf4j
public class KafkaConsumer {
private Gson gson=new Gson();
@Resource
private IQuestionService questionService;
@KafkaListener(topics = Topic.QUESTIONS)
public void receiveQuestion(ConsumerRecord<String,String> record) {
String json = record.value();
QuestionVo questionVo = gson.fromJson(json, QuestionVo.class);
log.debug("收到问题{},开始新增", questionVo);
questionService.saveQuestion(questionVo);
}
}
启动全部服务,登录一个学生
发布一个问题
检查数据库是否增加,并按这个问题题目或内容可能出现的分词进行查询
查询出来就表示ES中也添加成功了
稻草问答到此完结散花!!!
本文地址:https://blog.csdn.net/Da_Xia1/article/details/111997217