欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot 入门之消息中间件的使用

程序员文章站 2023-11-13 11:00:34
一、前言 在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。 我们常用的消息代理有...

一、前言

在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。

我们常用的消息代理有 jms 和 amqp 规范。对应地,它们常见的实现分别是 activemq 和 rabbitmq。

二、整合 activemq

2.1 添加依赖

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-activemq</artifactid>
</dependency>
<!-- 如果需要配置连接池,添加如下依赖 -->
<dependency> 
  <groupid>org.apache.activemq</groupid> 
  <artifactid>activemq-pool</artifactid> 
</dependency>

2.2 添加配置

# activemq 配置
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-connections=50
# 使用发布/订阅模式时,下边配置需要设置成 true
spring.jms.pub-sub-domain=false

此处 spring.activemq.pool.enabled=false,表示关闭连接池。

2.3 编码

配置类:

@configuration
public class jmsconfirguration {
  public static final string queue_name = "activemq_queue";
  
  public static final string topic_name = "activemq_topic";
  
  @bean
  public queue queue() {
    return new activemqqueue(queue_name);
  }
  
  @bean
  public topic topic() {
    return new activemqtopic(topic_name);
  }
}

负责创建队列和主题。

消息生产者:

@component
public class jmssender {
  @autowired
  private queue queue;
  
  @autowired
  private topic topic;
  
  @autowired
  private jmsmessagingtemplate jmstemplate;
  
  public void sendbyqueue(string message) {
    this.jmstemplate.convertandsend(queue, message);
  }
  
  public void sendbytopic(string message) {
    this.jmstemplate.convertandsend(topic, message);
  }
}

消息消费者:

@component
public class jmsreceiver {
  
  @jmslistener(destination = jmsconfirguration.queue_name)
  public void receivebyqueue(string message) {
    system.out.println("接收队列消息:" + message);
  }
  
  @jmslistener(destination = jmsconfirguration.topic_name)
  public void receivebytopic(string message) {
    system.out.println("接收主题消息:" + message);
  }
}

消息消费者使用 @jmslistener 注解监听消息。

2.4 测试

@runwith(springrunner.class)
@springboottest
public class jmstest {
  @autowired
  private jmssender sender;
  @test
  public void testsendbyqueue() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendbyqueue("hello activemq queue " + i);
    }
  }
  
  @test
  public void testsendbytopic() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendbytopic("hello activemq topic " + i);
    }
  }
}

打印结果:

接收队列消息:hello activemq queue 1
接收队列消息:hello activemq queue 2
接收队列消息:hello activemq queue 3
接收队列消息:hello activemq queue 4
接收队列消息:hello activemq queue 5

测试发布/订阅模式时,设置 spring.jms.pub-sub-domain=true

接收主题消息:hello activemq topic 1
接收主题消息:hello activemq topic 2
接收主题消息:hello activemq topic 3
接收主题消息:hello activemq topic 4
接收主题消息:hello activemq topic 5

三、整合 rabbitmq

3.1 添加依赖

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

3.2 添加配置

spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test

3.3 编码

配置类:

@configuration
public class amqpconfirguration {
  //=============简单、工作队列模式===============
  
  public static final string simple_queue = "simple_queue";
  @bean
  public queue queue() {
    return new queue(simple_queue, true);
  }
  
  //===============发布/订阅模式============
  
  public static final string ps_queue_1 = "ps_queue_1";
  public static final string ps_queue_2 = "ps_queue_2";
  public static final string fanout_exchange = "fanout_exchange";
  
  @bean
  public queue psqueue1() {
    return new queue(ps_queue_1, true);
  }
  
  @bean
  public queue psqueue2() {
    return new queue(ps_queue_2, true);
  }
  
  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(fanout_exchange);
  }
  
  @bean
  public binding fanoutbinding1() {
    return bindingbuilder.bind(psqueue1()).to(fanoutexchange());
  }
  
  @bean
  public binding fanoutbinding2() {
    return bindingbuilder.bind(psqueue2()).to(fanoutexchange());
  }
  //===============路由模式============
  
  public static final string routing_queue_1 = "routing_queue_1";
  public static final string routing_queue_2 = "routing_queue_2";
  public static final string direct_exchange = "direct_exchange";
  
  @bean
  public queue routingqueue1() {
    return new queue(routing_queue_1, true);
  }
  
  @bean
  public queue routingqueue2() {
    return new queue(routing_queue_2, true);
  }
  
  @bean
  public directexchange directexchange() {
    return new directexchange(direct_exchange);
  }
  
  @bean
  public binding directbinding1() {
    return bindingbuilder.bind(routingqueue1()).to(directexchange()).with("user");
  }
  
  @bean
  public binding directbinding2() {
    return bindingbuilder.bind(routingqueue2()).to(directexchange()).with("order");
  }
  
  //===============主题模式============
  
  public static final string topic_queue_1 = "topic_queue_1";
  public static final string topic_queue_2 = "topic_queue_2";
  public static final string topic_exchange = "topic_exchange";
  
  @bean
  public queue topicqueue1() {
    return new queue(topic_queue_1, true);
  }
  
  @bean
  public queue topicqueue2() {
    return new queue(topic_queue_2, true);
  }
  
  @bean
  public topicexchange topicexchange() {
    return new topicexchange(topic_exchange);
  }
  
  @bean
  public binding topicbinding1() {
    return bindingbuilder.bind(topicqueue1()).to(topicexchange()).with("user.add");
  }
  
  @bean
  public binding topicbinding2() {
    return bindingbuilder.bind(topicqueue2()).to(topicexchange()).with("user.#");
  }  
}

rabbitmq 有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看《rabbitmq 工作模式介绍》或者自行百度相关资料。

消息生产者:

@component
public class amqpsender {
  @autowired
  private amqptemplate amqptemplate;
  /**
   * 简单模式发送
   * 
   * @param message
   */
  public void simplesend(string message) {
    this.amqptemplate.convertandsend(amqpconfirguration.simple_queue, message);
  }
  /**
   * 发布/订阅模式发送
   * 
   * @param message
   */
  public void pssend(string message) {
    this.amqptemplate.convertandsend(amqpconfirguration.fanout_exchange, "", message);
  }
  /**
   * 路由模式发送
   * 
   * @param message
   */
  public void routingsend(string routingkey, string message) {
    this.amqptemplate.convertandsend(amqpconfirguration.direct_exchange, routingkey, message);
  }
  /**
   * 主题模式发送
   * 
   * @param routingkey
   * @param message
   */
  public void topicsend(string routingkey, string message) {
    this.amqptemplate.convertandsend(amqpconfirguration.topic_exchange, routingkey, message);
  }
}

消息消费者:

@component
public class amqpreceiver {
  /**
   * 简单模式接收
   * 
   * @param message
   */
  @rabbitlistener(queues = amqpconfirguration.simple_queue)
  public void simplereceive(string message) {
    system.out.println("接收消息:" + message);
  }
  /**
   * 发布/订阅模式接收
   * 
   * @param message
   */
  @rabbitlistener(queues = amqpconfirguration.ps_queue_1)
  public void psreceive1(string message) {
    system.out.println(amqpconfirguration.ps_queue_1 + "接收消息:" + message);
  }
  @rabbitlistener(queues = amqpconfirguration.ps_queue_2)
  public void psreceive2(string message) {
    system.out.println(amqpconfirguration.ps_queue_2 + "接收消息:" + message);
  }
  /**
   * 路由模式接收
   * 
   * @param message
   */
  @rabbitlistener(queues = amqpconfirguration.routing_queue_1)
  public void routingreceive1(string message) {
    system.out.println(amqpconfirguration.routing_queue_1 + "接收消息:" + message);
  }
  @rabbitlistener(queues = amqpconfirguration.routing_queue_2)
  public void routingreceive2(string message) {
    system.out.println(amqpconfirguration.routing_queue_2 + "接收消息:" + message);
  }
  /**
   * 主题模式接收
   * 
   * @param message
   */
  @rabbitlistener(queues = amqpconfirguration.topic_queue_1)
  public void topicreceive1(string message) {
    system.out.println(amqpconfirguration.topic_queue_1 + "接收消息:" + message);
  }
  
  @rabbitlistener(queues = amqpconfirguration.topic_queue_2)
  public void topicreceive2(string message) {
    system.out.println(amqpconfirguration.topic_queue_2 + "接收消息:" + message);
  }
}

消息消费者使用 @rabbitlistener 注解监听消息。

3.4 测试

@runwith(springrunner.class)
@springboottest
public class amqptest {
  @autowired
  private amqpsender sender;
  @test
  public void testsimplesend() {
    for (int i = 1; i < 6; i++) {
      this.sender.simplesend("test simplesend " + i);
    }
  }
  @test
  public void testpssend() {
    for (int i = 1; i < 6; i++) {
      this.sender.pssend("test pssend " + i);
    }
  }
  
  @test
  public void testroutingsend() {
    for (int i = 1; i < 6; i++) {
      this.sender.routingsend("order", "test routingsend " + i);
    }
  }
  
  @test
  public void testtopicsend() {
    for (int i = 1; i < 6; i++) {
      this.sender.topicsend("user.add", "test topicsend " + i);
    }
  }
}

测试结果略过。。。

踩坑提醒1:access_refused – login was refused using authentication mechanism plain

解决方案:

1) 请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。

2) 如果测试账户使用的是 guest,需要修改 rabbitmq.conf 文件。在该文件中添加 “loopback_users = none” 配置。

踩坑提醒2:cannot prepare queue for listener. either the queue doesn't exist or the broker will not allow us to use it

解决方案:

我们可以登陆 rabbitmq 的管理界面,在 queue 选项中手动添加对应的队列。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。