SpringBoot ~ 整合AMQP(RabbitMQ)
-
添加pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.properties配置
spring.rabbitmq.host=***.***.***.*** spring.rabbitmq.port=5762 spring.rabbitmq.username=admin spring.rabbitmq.password=***
-
在RabbitMQ中所有的消息生产者提交的消息都会交由Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的Queue中。RabbitMQ提供了4种不同策略,分别是Direct、Fanout、Topic、Header,4种策略中前三种使用率较高
-
Direct
DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,但一条消息到达DirectExchange时会被转发到与该条消息routing key相同的Queue上
-
DirectExchange的配置如下:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitDirectConfig { public final static String DIRECTNAME = "ysw-direct"; @Bean Queue queue1(){ return new Queue("queue-direct1"); } @Bean Queue queue2(){ return new Queue("queue-direct2"); } @Bean Queue queue3(){ return new Queue("queue-direct3"); } @Bean DirectExchange directExchange(){ return new DirectExchange(DIRECTNAME,true,false); } @Bean Binding binding1(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("direct1"); } @Bean Binding binding2(){ return BindingBuilder.bind(queue2()).to(directExchange()).with("direct2"); } }
DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,之配置一个Queue的实例即可
-
配置消费者
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class DirectReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-direct1") public void directHandler1(String msg){ logger.info("\033[30;4m"+"queue-direct1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-direct2") public void directHandler2(String msg){ logger.info("\033[30;4m"+"queue-direct2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-direct3") public void directHandler3(String msg){ logger.info("\033[30;4m"+"queue-direct3:"+msg+"\033[0m"); } }
通过@RabbitListener注解指定一个方法是一个消费者方法,方法参数就是所接收的消息。
-
消息发送
通过注入RabbitTemplate对象来进行消息发送,在这里我通过定时任务使其自定发送,须开启定时任务,详细操作可查看<SpringBoot定时任务@Schedule>一节
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(fixedDelay = 5000,initialDelay = 3000) public void direct(){ String message = "direct-task"; logger.info("\033[30;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend("ysw-direct","direct1",message); rabbitTemplate.convertAndSend("ysw-direct","direct2",message); rabbitTemplate.convertAndSend("queue-direct3",message); } }
-
-
Fanout
FanoutExchange的数据交换策略是把所有到达FanoutExchange的消息转发给所有与他绑定的Queue,在这种策略中,routingkey将不起作用。
-
FanoutExchange的配置方式如下:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitFanoutConfig { public final static String FANOUTNAME = "ysw-fanout"; @Bean Queue queue4(){ return new Queue("queue-fanout1"); } @Bean Queue queue5(){ return new Queue("queue-fanout2"); } @Bean Queue queue6(){ return new Queue("queue-fanout3"); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUTNAME,true,false); } @Bean Binding binding4(){ return BindingBuilder.bind(queue4()).to(fanoutExchange()); } @Bean Binding binding5(){ return BindingBuilder.bind(queue5()).to(fanoutExchange()); } @Bean Binding binding6(){ return BindingBuilder.bind(queue6()).to(fanoutExchange()); } }
-
配置消费者
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class FanoutReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-fanout1") public void fanoutHandler1(String msg){ logger.info("\033[31;4m"+"queue-fanout1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-fanout2") public void fanoutHandler2(String msg){ logger.info("\033[31;4m"+"queue-fanout2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-fanout3") public void fanoutHandler3(String msg){ logger.info("\033[31;4m"+"queue-fanout3:"+msg+"\033[0m"); } }
-
消息发送
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); Scheduled(fixedDelay = 5000,initialDelay = 4000) public void fanout(){ String message = "fanout-task"; logger.info("\033[31;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend("ysw-fanout",null,message); } }
-
-
Topic
TopicExchange是比较复杂也比较灵活的一种路由策略,在TopicExchange中,Queue通过routingkey绑定到TopicExchange上,当消息发送到TopicExchange后,TopicExchange根据消息的routingkey将消息路由到一个或多个Queue上。
-
TopicExchange配置如下:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitTopicConfig { public final static String TOPIC_NAME = "ysw-topic"; @Bean Queue queue7(){ return new Queue("queue-topic1"); } @Bean Queue queue8(){ return new Queue("queue-topic2"); } @Bean Queue queue9(){ return new Queue("queue-topic3"); } @Bean TopicExchange topicExchange(){ return new TopicExchange(TOPIC_NAME,true,false); } @Bean Binding binding7(){ /* * 匹配规则 * 绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况: * 绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况: * ①*(星号)仅代表一个单词 * ②#(井号)代表任意个单词 **/ return BindingBuilder.bind(queue7()).to(topicExchange()).with("#.topic1"); } @Bean Binding binding8(){ return BindingBuilder.bind(queue8()).to(topicExchange()).with("topic2.#"); } @Bean Binding binding9(){ return BindingBuilder.bind(queue9()).to(topicExchange()).with("#.topic3.*"); } }
-
配置消费者
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class TopicReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-topic1") public void topicHandler1(String msg){ logger.info("\033[32;4m"+"queue-topic1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-topic2") public void topicHandler2(String msg){ logger.info("\033[32;4m"+"queue-topic2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-topic3") public void topicHandler3(String msg){ logger.info("\033[32;4m"+"queue-topic3:"+msg+"\033[0m"); } }
-
消息发送
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(cron = "0-30/6 * * * * ?") public void topic(){ String message = "topic-task"; int i = 0; logger.info("\033[32;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.news",message + 1);//topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.salary",message + 2);//topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.news",message + 3);//topic2 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.item",message + 4);//topic2 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.sth.topic1",message + 5);//topic2&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.sth.topic2",message + 6);//topic2&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3",message + 7);//topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3.news",message + 8);//topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.topic3",message + 9); //topic1&topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.topic3",message + 10);//topic2&topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3.topic1",message + 11);//topic3&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.topic3.topic1",message + 12);//topic1&topic2&topic3 } }
-
-
Header
HeaderExchange是一种较少使用的路由策略,HeaderExchange会根据消息的Header将消息路由到不同的Queue上,这种策略也和routingkey无关。
-
HeaderExchange配置如下:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitHeaderConfig { public final static String HEADER_NAME = "ysw-header"; @Bean Queue queue10(){ return new Queue("queue-header1"); } @Bean Queue queue11(){ return new Queue("queue-header2"); } @Bean Queue queue12(){ return new Queue("queue-header3"); } @Bean HeadersExchange headersExchange(){ return new HeadersExchange(HEADER_NAME,true,false); } @Bean Binding binding10(){ Map<String,Object> map = new HashMap<>(); map.put("age", "18"); map.put("name", "ysw"); return BindingBuilder.bind(queue10()).to(headersExchange()).whereAny(map).match(); } @Bean Binding binding11(){ Map<String,Object> map = new HashMap<>(); map.put("name", "ysw"); return BindingBuilder.bind(queue11()).to(headersExchange()).where("age").exists(); } @Bean Binding binding12(){ Map<String,Object> map = new HashMap<>(); map.put("age", "18"); map.put("name", "ysw"); return BindingBuilder.bind(queue12()).to(headersExchange()).whereAll(map).match(); } }
Binding配置注释:whereAny表示消息的Header中只要有一个Header匹配上map中的key/value,就把该消息路由到名为“queue-header1”的Queue上;whereAll方法表示消息的所有Header都要匹配,才将消息路由到名为“queue-header2”的Queue上;where表示只要消息的header中包含age,无论age值为多少,都将消息路由到名为“queue-header2”的Queue上。
-
配置消费者
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class HeaderReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-header1") public void headerHandler1(byte[] msg){ logger.info("\033[33;4m"+"queue-header1:"+new String(msg,0,msg.length)+"\033[0m"); } @RabbitListener(queues = "queue-header2") public void headerHandler2(byte[] msg){ logger.info("\033[33;4m"+"queue-header2:"+new String(msg,0,msg.length)+"\033[0m"); } @RabbitListener(queues = "queue-header3") public void headerHandler3(byte[] msg){ logger.info("\033[33;4m"+"queue-header3:"+new String(msg,0,msg.length)+"\033[0m"); } }
-
消息发送
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(cron = "0-30/3 * * * * ?") public void header(){ String message = "header-task"; logger.info("\033[33;4m"+message+"\033[0m"); Message message1 = MessageBuilder.withBody("name=name".getBytes()) .setHeader("name", "aaa").build(); Message message2 = MessageBuilder.withBody("name=ysw".getBytes()) .setHeader("name", "ysw").build(); Message message3 = MessageBuilder.withBody("age=19".getBytes()) .setHeader("age", "19").build(); Message message4 = MessageBuilder.withBody("age=18".getBytes()) .setHeader("age", "18").build(); Message message5 = MessageBuilder.withBody("name=ysw&age=18".getBytes()) .setHeader("name", "ysw").setHeader("age","18").build(); Message message6 = MessageBuilder.withBody("name=ysw&age=19".getBytes()) .setHeader("name", "ysw").setHeader("age","19").build(); Message message7 = MessageBuilder.withBody("name=aaa&age=18".getBytes()) .setHeader("name", "aaa").setHeader("age","18").build(); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message1); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message2); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message3); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message4); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message5); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message6); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message7); } }
-
-
上一篇: Spring整合RabbitMQ实战配置
下一篇: Token
推荐阅读