欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq SpringBoot Demo 完美代码

程序员文章站 2022-03-05 18:27:31
...

rabbitmq SpringBoot Demo 完美代码@laowang

rabbitmq客户端配置

代码片.

#springboot整合rabbitmq的基本配置
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

#springboot整合rabbitmq的消费配置

#最大并发数
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

#手动签收manual,自动签收auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流
spring.rabbitmq.listener.simple.prefetch=1


server.servlet.context-path=/
server.port=8322

rabbitmq客户端注解的方式监听

    /**
     * 注解的方式监听
     */
    @RabbitListener(bindings = @QueueBinding(
                    value = @Queue(value = "order-queue",durable = "true"),
                    exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
                    key = "order.*"
    ))
    @RabbitHandler
    public void onOrderMessage(
            @Payload Order order,   // 消息体内容
            Channel channel,        // 通道
            @Headers Map<String,Object> headers // 消息头
            ) throws Exception{

        // 消费者操作
        System.out.println("收到消息,开始消费");
        System.out.println("订单ID: " + order.getId());

        Long deliveryTay = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);

        // ACK 手工调用 queues 才能消费
        channel.basicAck(deliveryTay,false);
    }

rabbitmq服务端配置

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

#消息确认模式,消费发出去异步等待broker响应,再写一个监听器监听响应结果
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

server.servlet.context-path=/
server.port=8321

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&rewriteBatchedStatements=TRUE&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=mima123
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

我用的是druid连接池,下面是druid配置


# 下面为连接池的补充设置,应用到上面所有数据源中

# 初始化大小,最小,最大
spring.datasource.initialSize=5
spring.datasource.minIdle=10
spring.datasource.maxActive=300

# 配置获取连接等待超时的时间
spring.datasource.maxWait=60000

# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.datasource.timeBetweenEvictionRunsMillis=60000

# 配置一个连接在池中最小生存的时间,单位是毫秒
spring.datasource.minEvictableIdleTimeMillis=300000
spring.datasource.validationQuery=SELECT 1 FROM DUAL
spring.datasource.testWhileIdle=true
spring.datasource.testOnBorrow=false
spring.datasource.testOnReturn=false

# 打开PSCache,并且指定每个连接上PSCache的大小
spring.datasource.poolPreparedStatements=true
spring.datasource.maxPoolPreparedStatementPerConnectionSize=20

# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.datasource.filters=stat,wall,log4j

# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

# 合并多个DruidDataSource的监控数据
spring.datasource.useGlobalDataSourceStat=true

rabbitmq发送消息方法调用:构建自定义对象消息

rivate static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);

    /**
     * 自动注入RabbitTemplate模板类
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    BrokerMessageLogMapper brokerMessageLogMapper;


    /**
     * 回调函数:confirm确实
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            logger.info("correlationData : {}" , correlationData);
            String messageId = correlationData.getId();
            if (ack) {
                // 如果confirm返回成功 则进行更新
                BrokerMessageLog log = new BrokerMessageLog();
                log.setMessageId(correlationData.getId());
                log.setStatus(Constants.ORDER_SEND_SUCCESS);
                log.setCreateTime(new Date());
                brokerMessageLogMapper.updateByPrimaryKeySelective(log);
            } else {
                // 失败则进行具体的后续操作,重试或者补偿等手段
                logger.info("异常处理流程。。。");
            }


        }
    };


    /**
     * 发送消息方法调用:构建自定义对象消息
     * @param order
     * @throws Exception
     */
    public void sendOrder(Order order) throws Exception {

        /** 发消息之前绑定一个异步监听的函数,指定消息唯一id 进行发送*/
        rabbitTemplate.setConfirmCallback(confirmCallback);

        CorrelationData correlationData = new CorrelationData();
        /**消息唯一ID*/
        correlationData.setId(order.getMessageId());


        rabbitTemplate.convertAndSend(
                "order-exchange",
                "order.abcd",
                order,  // 消息体内容
                correlationData // 消息唯一ID
                );
    }
相关标签: rabbitmq