欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring整合rabbitmq

程序员文章站 2022-03-07 16:17:30
...

    本文简单记录一下 spring 整合 rabbitmq,此处引入spring boot是为了方便引入和rabbitmq整合相关的jar包,并没有使用spring boot整合 rabbitmq。

 

实现功能

  1. 完成 spring 和 rabbitmq 的整合
  2. 完成使用 rabbitAdmin 创建队列等
  3. 完成使用 @Bean 注解声明队列等
  4. 完成使用 RabbitTemplate 进行发送消息
  5. 使用 SimpleMessageListenerContainer 进行消息的监听,可以对消息进行各种适配等

整合步骤:

1、引入 jar 包。

2、配置 ConnectionFacotry。

3、配置 RabbitAdmin 方便维护队列、交换器、绑定等。

4、配置 RabbitTemplate 方便程序中的消息的发送和接收等。

5、配置 SimpleMessageListenerContainer 方便程序中消息的监听。

 

小知识点:

1、在程序中进行队列、交换器、绑定的声明和使用 @Bean 注解来进行声明,如果要想在程序启动的时候自动创建这些队列等,那么在配置 RabbitAdmin 的时候需要将  autoStartup 属性设置成 true。

2、如果我们在程序运行的过程中需要动态修改 监听的队列或移除队列等,那么可以使用  SimpleMessageListenerContainer 来动态修改这些参数。

3、在如果我们想对接收到RabbitMQ发送的消息进行适配和消息转换等,那么使用SimpleMessageListenerContainer这个可以使用。

 

实现步骤:

1、引入 spring 整合 rabbitmq 整合的 maven 依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、spring 整合 rabbitmq 的核心配置类

/**
 * rabbitmq 配置
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:57
 */
@Configuration
@Slf4j
public class RabbitmqConfiguration {

	/**
	 * 创建 rabbitmq 连接工厂
	 *
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
		connectionFactory.setHost("140.143.237.224");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("root");
		connectionFactory.setPassword("root");
		connectionFactory.setVirtualHost("/");
		return connectionFactory;
	}

	/**
	 * rabbitmq 实现 AMQP 便携式的管理操作,比如创建队列、绑定、交换器等
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

	/**
	 * rabbit mq 模板
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory);
	}

	/**
	 * 消息监听容器
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
		SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
		// 设置监听的队列
		simpleMessageListenerContainer.setQueueNames("queue001", "queue002");
		// 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
		simpleMessageListenerContainer.setConcurrentConsumers(3);
		// 最大的并发消费者
		simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
		// 设置是否重回队列
		simpleMessageListenerContainer.setDefaultRequeueRejected(false);
		// 设置签收模式
		simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		// 设置非独占模式
		simpleMessageListenerContainer.setExclusive(false);
		// 设置consumer未被 ack 的消息个数
		simpleMessageListenerContainer.setPrefetchCount(1);
		// 接收到消息的后置处理
		simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> {
			message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理");
			return message;
		});
		// 设置 consumer 的 tag
		simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
			private AtomicInteger consumer = new AtomicInteger(1);

			@Override
			public String createConsumerTag(String queue) {
				return String.format("consumer:%s:%d", queue, consumer.getAndIncrement());
			}
		});
		// 设置消息监听器
		simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
			try {
				log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			} catch (Exception e) {
				log.error(e.getMessage(), e);
				// 发生异常此处需要捕获到
				channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
			}
		});

		/**  ================ 消息转换器的用法 ================
		 simpleMessageListenerContainer.setMessageConverter(new MessageConverter() {
		 // 将 java 对象转换成 Message 对象
		 @Override public Message toMessage(Object object, MessageProperties messageProperties) {
		 return null;
		 }

		 // 将 message 对象转换成 java 对象
		 @Override public Object fromMessage(Message message) {
		 return null;
		 }
		 });
		 */

		/**  ================ 消息适配器的用法,用于处理各种不同的消息 ================
		 MessageListenerAdapter adapter = new MessageListenerAdapter();
		 // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等
		 adapter.setDelegate(null);
		 adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名");

		 ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter();
		 // 文本装换器
		 MessageConverter txtMessageConvert = null;
		 // json 转换器
		 MessageConverter jsonMessageConvert = null;

		 converters.addDelegate("text", txtMessageConvert);
		 converters.addDelegate("html/text", txtMessageConvert);
		 converters.addDelegate("text/plain", txtMessageConvert);

		 converters.addDelegate("json", jsonMessageConvert);
		 converters.addDelegate("json/*", jsonMessageConvert);
		 converters.addDelegate("application/json", jsonMessageConvert);

		 adapter.setMessageConverter(converters);
		 simpleMessageListenerContainer.setMessageListener(adapter);

		 */
		return simpleMessageListenerContainer;
	}


	@Bean
	public Queue queue003() {
		return new Queue("queue003", false, false, false, null);
	}

	@Bean
	public Exchange exchange003() {
		return new TopicExchange("exchange003", false, false, null);
	}

	@Bean
	public Binding binding003() {
		return new Binding("queue003", Binding.DestinationType.QUEUE, "exchange003", "save.*", null);
	}

}

 3、动态移除和增加对队列的监听

/**
 * 测试动态改变 SimpleMessageListenerContainer 的属性,比如动态增加需要监听的队列等
 *
 * @author huan.fu
 * @date 2018/10/17 - 15:12
 */
@Component
@Slf4j
public class DynamicSimpleMessageListenerContainerTest implements InitializingBean {

	@Autowired
	private ApplicationContext applicationContext;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(10);
				SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
				log.info("移除对队列:[{}]的监听", "queue001");
				simpleMessageListenerContainer.removeQueueNames("queue001");
				TimeUnit.SECONDS.sleep(5);
				log.info("添加对队列:[{}]的监听", "queue001");
				String[] queueNames = simpleMessageListenerContainer.getQueueNames();
				Arrays.copyOf(queueNames, queueNames.length + 1);
				queueNames[queueNames.length - 1] = "queue001";
				simpleMessageListenerContainer.addQueueNames(queueNames);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

 4、使用 RabbitAdmin 创建队列等

/**
 * 测试 rabbitAdmin
 *
 * @author huan.fu
 * @date 2018/10/17 - 12:54
 */
@Component
@Slf4j
public class RabbitAdminService implements InitializingBean {

	@Autowired
	private RabbitAdmin rabbitAdmin;

	/**
	 * 创建队列
	 *
	 * @param queueName
	 */
	public void createQueue(String queueName) {
		log.info("创建队列:[{}]", queueName);
		rabbitAdmin.declareQueue(new Queue(queueName, false, false, false, null));
	}

	/**
	 * 创建direct交换器
	 *
	 * @param exchangeName
	 */
	public void createDirectExchange(String exchangeName) {
		log.info("创建direct交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new DirectExchange(exchangeName, false, false, null));
	}

	/**
	 * 创建topic交换器
	 *
	 * @param exchangeName
	 */
	public void createTopicExchange(String exchangeName) {
		log.info("创建topic交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new TopicExchange(exchangeName, false, false, null));
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		createQueue("queue001");
		createQueue("queue002");
		createDirectExchange("exchange001");
		createTopicExchange("exchange002");
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue001", Binding.DestinationType.QUEUE, "exchange001", "direct_001", null));
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue002", Binding.DestinationType.QUEUE, "exchange002", "topic.save.#", null));
	}
}

 5、使用 RabbitTemplate 进行发送消息

/**
 * RabbitTemplate测试
 *
 * @author huan.fu
 * @date 2018/10/17 - 14:35
 */
@Component
@Slf4j
public class RabbitTemplateTest implements InitializingBean {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(5);
				IntStream.rangeClosed(1, 10).forEach(num -> rabbitTemplate.convertAndSend("exchange001", "direct_001", String.format("这个是第[%d]条消息.", num)));
			} catch (InterruptedException e) {
				log.error(e.getMessage(), e);
			}
		}).start();
	}
}

 6、启动类

/**
 * spring 整合 rabbitmq
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:53
 */
@SpringBootApplication
public class RabbitMqApplication {
	public static void main(String[] args) {
		SpringApplication.run(RabbitMqApplication.class, args);
	}
}

 7、运行结果spring整合rabbitmq
            
    
    博客分类: rabbitmq rabbitmqspring整合rabbitmqSimpleMessageListenerContainer 
 

程序代码

代码: https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-spring

  • spring整合rabbitmq
            
    
    博客分类: rabbitmq rabbitmqspring整合rabbitmqSimpleMessageListenerContainer 
  • 大小: 172.6 KB