SpringBoot整合RabbitMQ Demo
程序员文章站
2022-07-12 12:28:28
...
引入pom
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改application.yml
生产端配置
spring:
rabbitmq:
address: 192.168.0.1:5672,192.168.0.2:5672,192.168.0.3:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
#启用消息确认模式
publisher-confirms: true
#设置return消息模式,要和mandatory一起配合使用
publisher-returns: true
template:
mandatory: true
消费端配置
spring:
rabbitmq:
address: 192.168.0.1:5672,192.168.0.2:5672,192.168.0.3:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
listener:
simple:
#消费者消费成功消息以后需要手工进行签收(ack),默认为auto
acknowledge-mode: manual
concurrency: 1
max-concurrency: 5
prefetch: 1
生产端Java代码
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认消息的回调监听接口,用于确认消息是否被broker所收到
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/***
*
* @param correlationData 作为一个唯一的标识
* @param ack broker 是否落盘成功
* @param cause 失败的一些异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息ACK结果"+ack);
}
};
/**
* 对外发送消息的方法
* @param message 具体的消息内容
* @param properties 额外的附加属性
*/
public void send(Object message, Map<String,Object> properties){
MessageHeaders mhs = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
//指定业务唯一的ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor mpp = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
System.out.println("--> post to do:" + message);
return message;
}
};
rabbitTemplate.convertAndSend("exchange-1",
"springboot.rabbit",
msg,mpp,correlationData);
}
}
消费端Java代码
@Component
public class RabbitReceiver {
//以下配置信息可以写在yml里再进行读取
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",durable = "true"),
exchange = @Exchange(name="exchange-1",
durable = "true",type = "topic",ignoreDeclarationExceptions = "true"),
key="springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws IOException {
//收到消息进行业务消息处理
System.out.println("----------");
System.out.println("消费消息:"+message.getPayload());
//手工ack
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
}
上一篇: RabbitMQ demo
推荐阅读
-
SpringBoot2 整合 Swagger2
-
SpringBoot 如何整合 ES 实现 CRUD 操作
-
springboot整合shiro多验证登录功能的实现(账号密码登录和使用手机验证码登录)
-
SpringBoot整合Druid、Redis的示例详解
-
SpringBoot无废话入门04:MyBatis整合
-
MyEclipse下SpringBoot+JSP整合过程及踩坑
-
SpringBoot轻松整合MongoDB的全过程记录
-
springboot、mybatisplus框架整合搭建
-
springboot整合MybatisPlus基本使用
-
SpringBoot整合MyBatis获得插入数据后获取主键,返回值总是1