欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用 RabbitMQ 实现异步调用

程序员文章站 2022-03-26 14:20:34
[TOC] 引言 除了上篇文章所讲的 ActiveMQ,还有一种流行的开源消息中间件叫 RabbitMQ。和 ActiveMQ 相比,它具有更高的性能。 RabbitMQ 不再基于 JMS 规范,也没有选择 Java 作为底层实现语言。 它基于另一种消息通信协议,名为 AMQP,并采用 Erlang ......

目录

引言

除了上篇文章所讲的 activemq,还有一种流行的开源消息中间件叫 rabbitmq。和 activemq 相比,它具有更高的性能。

rabbitmq 不再基于 jms 规范,也没有选择 java 作为底层实现语言。 它基于另一种消息通信协议,名为 amqp,并采用 erlang 语言作为技术实现。 rabbitmq 提供了众多语言客户端,能够与 spring 框架整合,spring boot 也提供了对 rabbitmq 的支持。

rabbitmq 官网: http://www.rabbitmq.com

启动 rabbitmq 服务器

运行 rabbitmq 容器

rabbitmq 官方已经提供了自己的 docker 容器,先下载 rabbitmq:3-management 镜像来启动 rabbitmq 容器, 之所以选择这个镜像是因为它拥有一个 web 控制台,可以通过浏览器来访问。

docker pull rabbitmq:3-management

rabbitmq 除了控制台,还提供了 http api 方式,可方便应用程序使用。

下面使用如下 docker 命令启动 rabbitmq

 docker run -d -p 15672:15672 -p 5672:5672 -e rabbitmq_default_user=admin -e rabbitmq_default_pass=admin --name rabbitmq rabbitmq:3-management

在启动 rabbitmq 容器时,它对宿主机暴露了两个端口号

  • 15672: 表示rabbitmq 控制台端口号,可在浏览器中通过控制台来执行 rabbitmq 的相关操作
  • 5672 表示 rabbitmq 监听的tcp 端口号,应用程序可以通过该端口号与 rabbitmq 建立 tcp 连接,并完成后续的异步消息通信

此外,启动时还有两个环境变量

  • rabbitmq_default_user : 设置控制台默认用户名, 默认为 guest
  • rabbitmq_default_pass: 设置控制台默认密码,默认为 guest

rabbitmq 控制台

rabbitmq 容器启动完毕后,打开浏览器,并在地址栏中输入 http://localhost:15672/ ,并且输入登录的用户名和密码,就可以看到控制台如下所示
使用 RabbitMQ 实现异步调用

在上面管理界面中,包含 6 个功能菜单

  • overview: 用于查看 rabbitmq 基本信息
  • connections: 用于查看 rabbitmq 客户端的连接信息
  • channels: 用于查看 rabbitmq 的通道
  • exchanges:用于查看 rabbitmq 的交换机
  • queues: 用于查看 rabbitmq 的队列
  • admin: 用于管理 rabbitmq 的用户,虚拟主机,策略等数据

exchange 和 queue

rabbitmq 只有 queue, 没有 topic,因为可通过 exchange 与 queue 的组合来实现 topic 所具备的功能。rabbitmq 的消息模型如下图所示
使用 RabbitMQ 实现异步调用

在 exchange 和 queue 间有一个 binding 关系,当消息从 producer 发送到 exchange 中时,会根据 binding 来路由消息的去向。

  • 如果 binding 各不相同,那么该消息将路由到其中一个 queue 中,随后将被一个 consumer 所消费,此时实现了 "点对点"的消息通信模型。
  • 如果 binding 完全相同,那么该消息就会路由到每个 queue 中,随后将会被每个 consumer 消费,这样就实现了 “发布与订阅” 的消息通信模型

因此可将 binding 理解为 exchange 到 queue 的路由规则,这些规则可通过 rabbitmq 所提供的客户端 api 来控制,也可通过 rabbitmq 提供的控制台来管理。

rabbitmq 提供了一个默认的 exchange(amqp default),在控制台的 exchange 菜单中就可以看到它,简单情况下,只需要使用默认的 exchange 即可,当需要提供发布与订阅功能时才会使用自定义的 exchange。

开发服务端和客户端

下面我们就将 spring boot 与 rabbitmq 进行整合,先开发一个服务端作为消息的消费者,再开发一个客户端作为消息的生产者,随后运行客户端,并查看服务端中接收到的消息。

开发服务端

创建一个名为 rabbitmq-hello-server 的 maven 项目或者 spring starter project, 在 pom.xml 文件中添加下面 maven 依赖

    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>1.5.19.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>
    </dependencies>

spring boot 框架中已经添加了对 rabbitmq 的支持,只需要依赖 spring-boot-starter-amqp 就可以启动 rabbitmq,此时还需要在 application.properties 配置文件中添加 rabbitmq 的相关配置项

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

接下来创建 helloserver 类,封装服务端代码

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;

@component
public class helloserver {

    @rabbitlistener(queues = "hello-queue")
    public void receive(string message) {
        system.out.println(message);
    }
}

只需要在 receive() 方法上定义 @rabbitlistener ,并且设置 queues 参数来指定消费者需要监听的的队列名称。

最后,编写一个 spring boot 应用程序启动类来启动服务器

package demo.msa.rabbitmq;

import org.springframework.amqp.core.queue;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.annotation.bean;

@springbootapplication
public class rabbitmqhelloserverapplication {

    @bean
    public queue helloqueue() {
        return new queue("hello-queue");
    }
    
    public static void main(string[] args) {
        springapplication.run(rabbitmqhelloserverapplication.class, args);
    }

}

在 rabbitmq 中,必须通过程序来显式创建队列。服务端启动完毕后,将持续监听 rabbitmq 的 hello-queue 队列中即将到来的消息,该消息由客户端来发送。

开发客户端

创建一个名为 rabbitmq-hello-client 的 maven 项目或者 spring starter project, pom 中的依赖与服务端一致。客户端的 application.properties 文件与服务端一致。

    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>1.5.19.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>
    </dependencies>

接下来创建一个名为 helloclient 的类,将其作为客户端

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

@component
public class helloclient {
    
    @autowired
    private rabbittemplate rabbitmqtemplate;
    
    public void send(string message) {
        rabbitmqtemplate.convertandsend("hello-queue", message);
    }
}

最后编写 spring boot 应用程序启动类来启动客户端

package demo.msa.rabbitmq;

import javax.annotation.postconstruct;

import org.springframework.amqp.core.queue;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.annotation.bean;

@springbootapplication
public class rabbitmqhelloclientapplication {
    
    @autowired
    private helloclient helloclient;
    
    @bean
    public queue helloqueue() {
        return new queue("hello-queue");
    }
    
    @postconstruct
    public void init() {
        helloclient.send("hello world!");
    }
    
    public static void main(string[] args) {
        springapplication.run(rabbitmqhelloclientapplication.class, args).close();
    }

}

与服务端一样,此处使用 @bean 注解的 helloqueue() 方法创建一个名为 hello-queue 的队列,这样可以保证当客户端在服务端之前启动时,也能创建所需的队列。而且 rabbitmq 可以确保不会创建同名的队列,因此可分别在服务端与客户端创建同名的队列。

运行 main 方法可以启动客户端应用程序,此时将在服务端看到客户端发送的消息,也可以在 rabbitmq 控制台中看到消息队列当前的状态。

java bean 类型传输

上面发送和接收的消息只是 string 类型,如果发送的消息是一个普通的 java bean 类型,应该如何调用呢?

java bean 类型则必须实现 serializable 序列化接口才能正常调用,这是因为 rabbitmq 所传送的消息是 byte[] 类型,当客户端发送消息需要进行序列化(也就是讲 java 类型转换为 byte[] 类型),当服务端接收消息前需要先反序列化,因此发送和接收的消息对象必须实现 jdk 的序列化接口。

除了这种序列化方式外,我们也可以使用 jackson 来实现,而且 rabbitmq 已经为我们提供了 jackson 序列化的方式,这种方式更加高效。所需要做的是定义一个 jackson2jsonmessageconverter 的 spring bean。

    @bean
    public jackson2jsonmessageconverter messageconverter() {
        return new jackson2jsonmessageconverter();
    }

结语

rabbitmq 的性能非常高效和稳定,也能非常方便的与 spring boot 应用程序集成,还拥有非常丰富的官方文档和控制台,因此选择 rabbitmq 作为服务之间的异步消息调用平台,将成为整个微服务架构中的 "消息中心"。

参考

  • 《架构探险—轻量级微服务架构》