欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 springboot整合消息服务

程序员文章站 2022-04-09 23:36:08
springboot整合activeMq、RabbitMQ...

springboot整合activeMq

ActiveMq是Apache提供的开源消息系统采用java实现,

很好地支持JMS(Java Message Service,即Java消息服务) 规范

ActiveMq安装:http://activemq.apache.org/components/classic/download/ 在官网下载安装对应的版本

下载完成后解压就可以使用

ActiveMq默认的端口号是8161,用户名和密码都是admin 在本机可以使用http://localhost:8161 去访问

springboot整合ActiveMq

1、导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2、在properties文件中配置activeMq

spring.activemq.broker-url=tcp://localhost:61616
#如果是点对点(queue),那么此处默认应该是false,如果发布订阅,那么一定设置为true
spring.activemq.packages.trust-all=true
spring.activemq.user=admin
spring.activemq.password=admin

3、编写queue(队列)

@Component
public class QueueBean{
    //创建一个队列实例
    @Bean
    Queue queue(){
        //这里设置的消息是队列的名称
        return new ActiveMQQueue("hello.javaboy");
    }
}

4、创建消息的发送者以及消费者

@Component
public class JmsComponent{
    //springboot提供的消息模板
    @Autowired
    JmsMessagingTemplate jmsMessagingTemplate;
    //自己创建的队列实例
    @Autowired
    Queue queue;
    /**
     * 发送消息
     * @param message
     */
    public void send(Message message){
        jmsMessagingTemplate.convertAndSend(this.queue,message);
    }
    /**
     * 接收消息
     * @param message
     */
    //表示监听该队列名称发来的消息
    @JmsListener(destination = "hello.javaboy")
    public void readMessage(Message message){
        System.out.println(message);
    }
​
}

5、上述Message实体类

public class Message implements Serializable {
    private String content;//消息主体
    private Date sendDate;//消息发送的时间
    //省略get、set、tostring方法
}

6、进行消息的发送以及消费

在测试类中注入JmsComponent 调用send()方法进行消息的转发

@SpringBootTest
class ActivemqApplicationTests {
    @Autowired
    JmsComponent jmsComponent;
    @Test
    void contextLoads() {
        Message message = new Message();
        message.setContent("hello activeMq");
        message.setSendDate(new Date());
        jmsComponent.send(message);
    }
}

首先启动项目,在运行测试类进行消息发送:

控制台会打印消息内容:

荐
                                                        springboot整合消息服务

 

springboot整合RabbitMQ

rabbitmq安装比较繁琐,这里使用docker容器进行安装,docker安装非常方便,一条命令全部搞定

通过docker安装rabbitmq

-P(大p)表示自动映射到主机端口

docker run -d --hostname my-rabbitmq --name some-rabbitmq -P rabbitmq:3-management

springboot整合RabbitMQ:

首先导入依赖

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

编写配置文件:

#配置rabbitMQ
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=32771

 

RabbitMQ 四种交换模式:

  • 直连交换机:Direct exchange

  • 扇形交换机:Fanout exchange

  • 主体交换机:Topic exchange

  • 首部交换机:Headers exchange

下面分别介绍4中交换模式:

1、Direct exchange

Direct策略(只转发给routingKey相匹配的用户)

配置RabbitDirectConfig:

//Direct策略(只转发给routingKey相匹配的用户)
@Configuration
public class RabbitDirectConfig {
    public final static String DIRECTNAME = "javaboy-direct";
    //消息队列
    @Bean
    Queue queue(){
        //name值为队列名称,routingKey会与他进行匹配
        return new Queue("hello.RabbitMQ");
    }
    @Bean
    Queue queue1(){
        return new Queue("hello.RabbitMQ1");
    }
    @Bean
    DirectExchange directExchange(){
        //第一个参数为DIRECTNAME、第二个参数表示重启后是否有效,第三参数表示长时间未使用是否删除
        return new DirectExchange(DIRECTNAME,true,false);
    }
    @Bean
    Binding binding(){
        //将队列queue和DirectExchange绑定在一起
        return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
    }
    @Bean
    Binding binding1(){
        //将队列queue和DirectExchange绑定在一起
        return BindingBuilder.bind(queue1()).to(directExchange()).with("direct");
    }
​
}

2、配置消费者DirectReceiver:

//配置消费者
@Component
public class DirectReceiver {
    //只监听queue()队列的消息
    @RabbitListener(queues = "hello.RabbitMQ")
    public void hanlder(String msg){
        System.out.println("hanlder>>>"+msg);
​
    }
    //只监听queue1()队列的消息
    @RabbitListener(queues = "hello.RabbitMQ1")
    public void hanlder1(String msg){
        System.out.println("hanlder1>>>"+msg);
​
    }
}

测试代码:

在springboot的测试类中注入RabbitTemplate(springboot提供的RabbitMQ模板)

 @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        //两个参数第一个是routingKey、第二个为消息内容
        rabbitTemplate.convertAndSend("hello.RabbitMQ","hello RabbitMQ test");
        rabbitTemplate.convertAndSend("hello.RabbitMQ1","hello RabbitMQ test222");
    }

启动项目后,运行测试类可以看到只有与routingkey相匹配的消费者受到了对应的消息:

荐
                                                        springboot整合消息服务

 

2、Fanout exchange

Fanout策略(只要是与他绑定的队列,都会收到消息与routingKey无关)

1、配置RabbitFanoutConfig:

//Fanout策略(只要是与他绑定的队列,都会收到消息与routingKey无关)
@Configuration
public class RabbitFanoutConfig {
    public final static String FANOUTNAME = "javaboy-fanout";
    //配置了两个消息队列queueOne和queueTwo
    @Bean
    Queue queueOne(){
        return new Queue("queue-one");
    }
    @Bean
    Queue queueTwo(){
        return new Queue("queue-two");
    }
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUTNAME,true,false);
    }
    //将两个队列与FanoutExchange绑定
    @Bean
    Binding bindingOne(){
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }
    @Bean
    Binding bindingTwo(){
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

2、配置消费者FanoutReceiver:

//配置消费者
@Component
public class FanoutReceiver {
    //两个消费者分别监听两个不同的队列
    @RabbitListener(queues = "queue-one")
    public void hanlder1(String msg){
        System.out.println("FanoutReceiver:hanlder1>>>"+msg);
​
    }
    @RabbitListener(queues = "queue-two")
    public void hanlder2(String msg){
        System.out.println("FanoutReceiver:hanlder2>>>"+msg);
​
    }
}

3、测试类:

@Test
    void rabbitFanout(){
        //三个参数表示RabbitFanoutConfig的名称、routingkey、消息内容
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout test");
    }

该方式与routingkey无关所有写null即可

查看输出:可以看到两个消费者都收到了消息

荐
                                                        springboot整合消息服务

 

3、Topic exchange

topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为.#. *为单词,#表示模糊匹配

例如routingkey为:xiaomi.# 那么带有xiaomi.开头的队列都会收到该消息

routingkey为:#.phone.# 表示消息的routingKey中带有phone时 就会去匹配带有phone的队列

1、配置RabbitTopicConfig:

/topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
    //*为单词,#表示模糊匹配
@Configuration
public class RabbitTopicConfig {
    public final static String TOPICNAME = "javaboy-topic";
​
    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(TOPICNAME,true,false);
    }
    @Bean
    Queue xiaomi(){
        return new Queue("xiaomi");
    }
    @Bean
    Queue huawei(){
        return new Queue("huawei");
    }
    @Bean
    Queue phone(){
        return new Queue("phone");
    }
​
    @Bean
    Binding xiaomiBinding(){
        //xiaomi.#:表示消息的routingKey是以xiaomi开头的就会路由到xiaomi的队列
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }
    @Bean
    Binding huaweiBinding(){
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }
    @Bean
    Binding phoneBinding(){
        //#.phone.#:表示消息的routingKey中带phone的都会路由到phone的队列
        return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
    }
}

2、配置消费者TopicReceiver:

@Component
public class TopicReceiver {
    //分别监听名称为xiaomi、huawei、phone的队列
    @RabbitListener(queues = "xiaomi")
    public void handlerXM(String msg){
        System.out.println("TopicReceiver:handlerXM>>>"+msg);
    }
    @RabbitListener(queues = "huawei")
    public void handlerHW(String msg){
        System.out.println("TopicReceiver:handlerHW>>>"+msg);
    }
    @RabbitListener(queues = "phone")
    public void handlerPHONE(String msg){
        System.out.println("TopicReceiver:handlerPHONE>>>"+msg);
    }
}

3、测试类:

@Test
    void rabbitTopic(){
        //根据匹配规则该消息只能被xiaomi的队列收到
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻");
        //根据匹配规则该消息只能被phone的队列收到
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"vivo.phone","vivo手机");
        //根据匹配规则该消息可以别huawei和phone两个队列收到
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机");
​
    }

查看输出:

荐
                                                        springboot整合消息服务荐
                                                        springboot整合消息服务

 

可以看到routingkey为huawei.phone的消息匹配了两个队列,其他两个都只匹配了一个队列

4、Headers exchange

该模式是根据路由规则的header进行匹配的,在进行匹配的时候需要传入一个map集合,routingkey去匹配map即可中的key value,匹配规则可以使any或者all,any表示只要包含任意信息就可以,all表示所有信息都必须匹配

1、配置RabbitHeaderConfig:

@Configuration
public class RabbitHeaderConfig {
    public final static String HEADERNAME = "javaboy-header";
​
    @Bean
    HeadersExchange headersExchange(){
        return new HeadersExchange(HEADERNAME,true,false);
    }
    //分别创建两个不同header的队列
    @Bean
    Queue queueName(){
        return new Queue("name-queue");
    }
    @Bean
    Queue queueAge(){
        return new Queue("age-queue");
    }
    @Bean
    Binding bindingName(){
        Map<String,Object> map = new HashMap<>();
        map.put("name","hello");
        //表示如果routingKey匹配的map集合中的key value 就会将消息转发到对应的路由上
        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
    }
​
    @Bean
    Binding bindingAge(){
        return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
    }
}

2、创建消费者HeaderReceiver:

@Component
public class HeaderReceiver {
    @RabbitListener(queues = "name-queue")
    public void handlerName(byte[] msg){
        System.out.println("HeaderReceiver:handlerName>>>>"+new String(msg,0,msg.length));
    }
    @RabbitListener(queues = "age-queue")
    public void handlerAge(byte[] msg){
        System.out.println("HeaderReceiver:handlerAge>>>>"+new String(msg,0,msg.length));
    }
}

3、测试代码:

@Test
    public void rabbitHeader(){
        //设置消息,并且设置header,setHeader("name","hello")分别表示map集合中的key、value
        Message nameMessage = 
            MessageBuilder.withBody("hello name".getBytes()).setHeader("name","hello").build();
        Message ageMessage =
            MessageBuilder.withBody("hello 99 age".getBytes()).setHeader("age","99").build();
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,nameMessage);
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,ageMessage);
    }

查看输出:

荐
                                                        springboot整合消息服务

 

改变setheader中的值查看结果:

 Message nameMessage = 
            MessageBuilder.withBody("hello name".getBytes()).setHeader("name","javaboy").build();

 

荐
                                                        springboot整合消息服务

可以看到因为key、value匹配不上只打印了一条消息。

本文地址:https://blog.csdn.net/angegr66/article/details/107363988