rabbitmq SpringBoot Demo 完美代码
程序员文章站
2022-03-05 18:27:31
...
rabbitmq SpringBoot Demo 完美代码@laowang
rabbitmq客户端配置
代码片
.
#springboot整合rabbitmq的基本配置
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#springboot整合rabbitmq的消费配置
#最大并发数
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
#手动签收manual,自动签收auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流
spring.rabbitmq.listener.simple.prefetch=1
server.servlet.context-path=/
server.port=8322
rabbitmq客户端注解的方式监听
/**
* 注解的方式监听
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.*"
))
@RabbitHandler
public void onOrderMessage(
@Payload Order order, // 消息体内容
Channel channel, // 通道
@Headers Map<String,Object> headers // 消息头
) throws Exception{
// 消费者操作
System.out.println("收到消息,开始消费");
System.out.println("订单ID: " + order.getId());
Long deliveryTay = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// ACK 手工调用 queues 才能消费
channel.basicAck(deliveryTay,false);
}
rabbitmq服务端配置
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#消息确认模式,消费发出去异步等待broker响应,再写一个监听器监听响应结果
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
server.servlet.context-path=/
server.port=8321
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&rewriteBatchedStatements=TRUE&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=mima123
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
我用的是druid连接池,下面是druid配置
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
spring.datasource.initialSize=5
spring.datasource.minIdle=10
spring.datasource.maxActive=300
# 配置获取连接等待超时的时间
spring.datasource.maxWait=60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.datasource.timeBetweenEvictionRunsMillis=60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
spring.datasource.minEvictableIdleTimeMillis=300000
spring.datasource.validationQuery=SELECT 1 FROM DUAL
spring.datasource.testWhileIdle=true
spring.datasource.testOnBorrow=false
spring.datasource.testOnReturn=false
# 打开PSCache,并且指定每个连接上PSCache的大小
spring.datasource.poolPreparedStatements=true
spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.datasource.filters=stat,wall,log4j
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
spring.datasource.useGlobalDataSourceStat=true
rabbitmq发送消息方法调用:构建自定义对象消息
rivate static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);
/**
* 自动注入RabbitTemplate模板类
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
BrokerMessageLogMapper brokerMessageLogMapper;
/**
* 回调函数:confirm确实
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
logger.info("correlationData : {}" , correlationData);
String messageId = correlationData.getId();
if (ack) {
// 如果confirm返回成功 则进行更新
BrokerMessageLog log = new BrokerMessageLog();
log.setMessageId(correlationData.getId());
log.setStatus(Constants.ORDER_SEND_SUCCESS);
log.setCreateTime(new Date());
brokerMessageLogMapper.updateByPrimaryKeySelective(log);
} else {
// 失败则进行具体的后续操作,重试或者补偿等手段
logger.info("异常处理流程。。。");
}
}
};
/**
* 发送消息方法调用:构建自定义对象消息
* @param order
* @throws Exception
*/
public void sendOrder(Order order) throws Exception {
/** 发消息之前绑定一个异步监听的函数,指定消息唯一id 进行发送*/
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData correlationData = new CorrelationData();
/**消息唯一ID*/
correlationData.setId(order.getMessageId());
rabbitTemplate.convertAndSend(
"order-exchange",
"order.abcd",
order, // 消息体内容
correlationData // 消息唯一ID
);
}
上一篇: 前端框架有哪些
下一篇: 移动端H5页面实现生成图片的代码