欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring boot集成RabbitMQ

程序员文章站 2022-06-12 15:37:31
...

spring boot集成RabbitMQ

原理见:https://www.jianshu.com/p/79ca08116d57

安装RabbitMQ略过

1.Springboot引入相关依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.application.properties配置

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

###消费者额外配置
spring.rabbitmq.listener.concurrency=2  //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数

3.RabbitMQ配置类

@Configuration 
public class RabbitConfig {
/**
 * 声明队列
 * @return
 */
@Bean
public Queue queue() {
    return new Queue("hello.queue1");
}

/**
 * 声明交互器
 * @return
 */
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange("topicExchange");
}

/**
 * 绑定
 * @return
 */
@Bean
public Binding binding(){
    return BindingBuilder.bind(queue()).to(topicExchange()).with("key");
}
    
}

4.消息生产者

@Component
@Slf4j
public class Producer implements RabbitTemplate.ConfirmCallback, ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
    @Override
    public void handle(Return aReturn) {

        log.info("消息{},发送失败",aReturn.getBody());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {

        if(ack){
            log.info("消息发送成功:{}",correlationData);
        }else {

            log.info("消息发送失败:{}",s);
        }
    }

    public void send(String message){

        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        log.info("发送消息:{}",message);
        String resp=rabbitTemplate.convertSendAndReceive("topicExchange","key",message,correlationId).toString();
        log.info("消费者响应:{},处理完成",resp);
    }

要点:
1.注入RabbitTemplate
2.实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必须)。
ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
3.实现消息发送方法。调用rabbitTemplate相应的方法即可。rabbitTemplate常用发送方法有:

rabbitTemplate.send(message);   //发消息,参数类型为org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。

5.消息消费者

@Componentpublic class Customer {
    
    @RabbitListener(queues = "hello.queue1")
    public String processMessage1(String msg) {
        System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);        return msg.toUpperCase();    }
}

要点:
1.监听器参数类型与消息实际类型匹配。在生产者中发送的消息实际类型是String,所以这里监听器参数类型也是String。
2.如果监听器需要有响应返回给生产者,直接在监听方法中return即可。
3.queues必须与生产者中配置的队列名一样。

6.测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqApplicationTests {

    @Autowired
    Producer producer;
    @Test
    public void contextLoads() throws InterruptedException {

        for (int i = 0; i < 100; i++) {
            producer.send("发送第"+i+"条消息");
            Thread.sleep(2000);
        }
    }

}

输出结果如下:
spring boot集成RabbitMQ

相关标签: JAVA 消息队列