使用SpringBoot中提供的的RabbitMQ的API来编写一个消息队列的例子
哎,学习了的东西总是忘记,回去找之前的学习记录或者代码又要看半天才能看懂,那我就自己写个例子来帮助自己记忆那岂不是美滋滋~~~
此例子的场景是,提供者修改数据库中的价格price,并向消息队列中发送消息,将被修改价格的商品主键(id)作为消息传入。消费者监听队列,监听到消息后打印消息主体(id)
1. 创建项目导入依赖
1.1 创建项目
我创建的是maven项目,父项目是rabbitMQ,子项目是provider提供者和consumer消费者。
1.2 导入依赖
导入依赖,最开始我是在rabbitMQ中的pom文件中导入依赖,可是最后发现spring-boot-starter-web依赖与spring-boot-starter-amqp依赖会出现包冲突,尽管我剔除冲突的包也没有解决问题。所有我就分开导入依赖了。
provider的pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
<!-- 还有数据库操作jdbc的相关依赖 -->
consumer的pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
其实是差不多了,至少provider多了数据库操作相关的依赖
2. yaml配置
provider
server:
port: 8088
spring:
rabbitmq:
host: 192.168.106.132 # rabbiymq的IP地址
port: 5672
username: guest
password: guest
template:
exchange: GOODS.EXCHANGE # 设置交换机
datasource:
url: jdbc:mysql://localhost:3306/amqp?serverTimezone=UTC
username: root
password:
consumer
server:
port: 8089
spring:
rabbitmq:
host: 192.168.106.132
port: 5672
username: guest
password: guest
virtual-host: /
3. 代码编写
3.1 provider
提供者主要是接受前台发送的被修改商品的id和修改的价格price,修改完成后,将id作为消息主体发送
到消息队列。
Controller
@Controller
public class ProvController {
@Autowired
private ProvService service;
@RequestMapping("/goods")
public ResponseEntity<Void> update(Long id, Long price){
service.update(id, price);
return ResponseEntity.status(HttpStatus.CREATED).build();
}
}
Service
@Service
public class ProvService {
@Autowired
private ProvMapper mapper;
@Autowired
private AmqpTemplate amqpTemplate;
public void update(Long id, Long price) {
Goods goods = new Goods();
goods.setId(id);
goods.setPrice(price);
mapper.updateByPrimaryKey(goods);
// 向消息队列发送消息,并将修改了价格的商品id作为消息发送
amqpTemplate.convertAndSend("goods.update", id);
System.out.println("发送成功~~~~~~~");
}
}
3.2 consumer
监听消息队列,并打印被修改商品的id
GoodsListener
@Component
public class GoodsListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "GOODS.QUEUE", durable = "true"),
exchange = @Exchange(value = "GOODS.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
key = {"goods.update"}
))
public void update(Long id){
// 当从消息队列中获取到消息时,可在此处理逻辑,比如更新静态页面,或者elasticsearch中的数据
System.out.println("被修改商品的id是:"+ id);
}
}
-
@Componet
:类上的注解,注册到Spring容器 -
@RabbitListener
:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:-
bindings
:指定绑定关系,可以有多个。值是@QueueBinding
的数组。@QueueBinding
包含下面属性:-
value
:这个消费者关联的队列。值是@Queue
,代表一个队列- durable: 表示是否持久化的
-
exchange
:队列所绑定的交换机,值是@Exchange
类型- ignoreDeclarationExceptions: 是否忽略交换机已存在的异常
-
key
:队列和交换机绑定的RoutingKey
-
-
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
4. 测试
第一次测试的时候,需要先运行consumer消费者,因为需要先建立交换机和队列。若没建立就向队列中发送消息会出错。
连接成功
使用postman发现一个get请求,进行测试
201 请求成功,去看看consumer是否打印了1呢
打印成功~~~~
5. 总结
在编写这个小例子的时候还是遇到了一些小问题的,
第一个就是依赖冲突这个问题,真的有点头大。这个对于我这种新手来说就只能百度然后尝试了。
第二个就是我在配置provider的时候,忘记了配置exchange交换机了,所以测试的时候就一直没有把消息发送成功到队列,害我找了大半天,所以干什么都要仔细细心啊~~~