spring整合rabbitmq
程序员文章站
2022-03-07 16:17:30
...
本文简单记录一下 spring 整合 rabbitmq,此处引入spring boot是为了方便引入和rabbitmq整合相关的jar包,并没有使用spring boot整合 rabbitmq。
实现功能
- 完成 spring 和 rabbitmq 的整合
- 完成使用 rabbitAdmin 创建队列等
- 完成使用 @Bean 注解声明队列等
- 完成使用 RabbitTemplate 进行发送消息
- 使用 SimpleMessageListenerContainer 进行消息的监听,可以对消息进行各种适配等
整合步骤:
1、引入 jar 包。
2、配置 ConnectionFacotry。
3、配置 RabbitAdmin 方便维护队列、交换器、绑定等。
4、配置 RabbitTemplate 方便程序中的消息的发送和接收等。
5、配置 SimpleMessageListenerContainer 方便程序中消息的监听。
小知识点:
1、在程序中进行队列、交换器、绑定的声明和使用 @Bean 注解来进行声明,如果要想在程序启动的时候自动创建这些队列等,那么在配置 RabbitAdmin 的时候需要将 autoStartup 属性设置成 true。
2、如果我们在程序运行的过程中需要动态修改 监听的队列或移除队列等,那么可以使用 SimpleMessageListenerContainer 来动态修改这些参数。
3、在如果我们想对接收到RabbitMQ发送的消息进行适配和消息转换等,那么使用SimpleMessageListenerContainer这个可以使用。
实现步骤:
1、引入 spring 整合 rabbitmq 整合的 maven 依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2、spring 整合 rabbitmq 的核心配置类
/** * rabbitmq 配置 * * @author huan.fu * @date 2018/10/17 - 10:57 */ @Configuration @Slf4j public class RabbitmqConfiguration { /** * 创建 rabbitmq 连接工厂 * * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); connectionFactory.setHost("140.143.237.224"); connectionFactory.setPort(5672); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * rabbitmq 实现 AMQP 便携式的管理操作,比如创建队列、绑定、交换器等 * * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * rabbit mq 模板 * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * 消息监听容器 * * @param connectionFactory * @return */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 设置监听的队列 simpleMessageListenerContainer.setQueueNames("queue001", "queue002"); // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加 simpleMessageListenerContainer.setConcurrentConsumers(3); // 最大的并发消费者 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); // 设置是否重回队列 simpleMessageListenerContainer.setDefaultRequeueRejected(false); // 设置签收模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置非独占模式 simpleMessageListenerContainer.setExclusive(false); // 设置consumer未被 ack 的消息个数 simpleMessageListenerContainer.setPrefetchCount(1); // 接收到消息的后置处理 simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> { message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理"); return message; }); // 设置 consumer 的 tag simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() { private AtomicInteger consumer = new AtomicInteger(1); @Override public String createConsumerTag(String queue) { return String.format("consumer:%s:%d", queue, consumer.getAndIncrement()); } }); // 设置消息监听器 simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { try { log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error(e.getMessage(), e); // 发生异常此处需要捕获到 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }); /** ================ 消息转换器的用法 ================ simpleMessageListenerContainer.setMessageConverter(new MessageConverter() { // 将 java 对象转换成 Message 对象 @Override public Message toMessage(Object object, MessageProperties messageProperties) { return null; } // 将 message 对象转换成 java 对象 @Override public Object fromMessage(Message message) { return null; } }); */ /** ================ 消息适配器的用法,用于处理各种不同的消息 ================ MessageListenerAdapter adapter = new MessageListenerAdapter(); // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等 adapter.setDelegate(null); adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名"); ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter(); // 文本装换器 MessageConverter txtMessageConvert = null; // json 转换器 MessageConverter jsonMessageConvert = null; converters.addDelegate("text", txtMessageConvert); converters.addDelegate("html/text", txtMessageConvert); converters.addDelegate("text/plain", txtMessageConvert); converters.addDelegate("json", jsonMessageConvert); converters.addDelegate("json/*", jsonMessageConvert); converters.addDelegate("application/json", jsonMessageConvert); adapter.setMessageConverter(converters); simpleMessageListenerContainer.setMessageListener(adapter); */ return simpleMessageListenerContainer; } @Bean public Queue queue003() { return new Queue("queue003", false, false, false, null); } @Bean public Exchange exchange003() { return new TopicExchange("exchange003", false, false, null); } @Bean public Binding binding003() { return new Binding("queue003", Binding.DestinationType.QUEUE, "exchange003", "save.*", null); } }
3、动态移除和增加对队列的监听
/** * 测试动态改变 SimpleMessageListenerContainer 的属性,比如动态增加需要监听的队列等 * * @author huan.fu * @date 2018/10/17 - 15:12 */ @Component @Slf4j public class DynamicSimpleMessageListenerContainerTest implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { try { TimeUnit.SECONDS.sleep(10); SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class); log.info("移除对队列:[{}]的监听", "queue001"); simpleMessageListenerContainer.removeQueueNames("queue001"); TimeUnit.SECONDS.sleep(5); log.info("添加对队列:[{}]的监听", "queue001"); String[] queueNames = simpleMessageListenerContainer.getQueueNames(); Arrays.copyOf(queueNames, queueNames.length + 1); queueNames[queueNames.length - 1] = "queue001"; simpleMessageListenerContainer.addQueueNames(queueNames); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
4、使用 RabbitAdmin 创建队列等
/** * 测试 rabbitAdmin * * @author huan.fu * @date 2018/10/17 - 12:54 */ @Component @Slf4j public class RabbitAdminService implements InitializingBean { @Autowired private RabbitAdmin rabbitAdmin; /** * 创建队列 * * @param queueName */ public void createQueue(String queueName) { log.info("创建队列:[{}]", queueName); rabbitAdmin.declareQueue(new Queue(queueName, false, false, false, null)); } /** * 创建direct交换器 * * @param exchangeName */ public void createDirectExchange(String exchangeName) { log.info("创建direct交换器:[{}]", exchangeName); rabbitAdmin.declareExchange(new DirectExchange(exchangeName, false, false, null)); } /** * 创建topic交换器 * * @param exchangeName */ public void createTopicExchange(String exchangeName) { log.info("创建topic交换器:[{}]", exchangeName); rabbitAdmin.declareExchange(new TopicExchange(exchangeName, false, false, null)); } @Override public void afterPropertiesSet() throws Exception { createQueue("queue001"); createQueue("queue002"); createDirectExchange("exchange001"); createTopicExchange("exchange002"); // 创建绑定 rabbitAdmin.declareBinding(new Binding("queue001", Binding.DestinationType.QUEUE, "exchange001", "direct_001", null)); // 创建绑定 rabbitAdmin.declareBinding(new Binding("queue002", Binding.DestinationType.QUEUE, "exchange002", "topic.save.#", null)); } }
5、使用 RabbitTemplate 进行发送消息
/** * RabbitTemplate测试 * * @author huan.fu * @date 2018/10/17 - 14:35 */ @Component @Slf4j public class RabbitTemplateTest implements InitializingBean { @Autowired private RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); IntStream.rangeClosed(1, 10).forEach(num -> rabbitTemplate.convertAndSend("exchange001", "direct_001", String.format("这个是第[%d]条消息.", num))); } catch (InterruptedException e) { log.error(e.getMessage(), e); } }).start(); } }
6、启动类
/** * spring 整合 rabbitmq * * @author huan.fu * @date 2018/10/17 - 10:53 */ @SpringBootApplication public class RabbitMqApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqApplication.class, args); } }
7、运行结果
程序代码
代码: https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-spring