使用 RabbitMQ 实现异步调用
目录
引言
除了上篇文章所讲的 activemq,还有一种流行的开源消息中间件叫 rabbitmq。和 activemq 相比,它具有更高的性能。
rabbitmq 不再基于 jms 规范,也没有选择 java 作为底层实现语言。 它基于另一种消息通信协议,名为 amqp,并采用 erlang 语言作为技术实现。 rabbitmq 提供了众多语言客户端,能够与 spring 框架整合,spring boot 也提供了对 rabbitmq 的支持。
rabbitmq 官网: http://www.rabbitmq.com
启动 rabbitmq 服务器
运行 rabbitmq 容器
rabbitmq 官方已经提供了自己的 docker 容器,先下载 rabbitmq:3-management 镜像来启动 rabbitmq 容器, 之所以选择这个镜像是因为它拥有一个 web 控制台,可以通过浏览器来访问。
docker pull rabbitmq:3-management
rabbitmq 除了控制台,还提供了 http api 方式,可方便应用程序使用。
下面使用如下 docker 命令启动 rabbitmq
docker run -d -p 15672:15672 -p 5672:5672 -e rabbitmq_default_user=admin -e rabbitmq_default_pass=admin --name rabbitmq rabbitmq:3-management
在启动 rabbitmq 容器时,它对宿主机暴露了两个端口号
- 15672: 表示rabbitmq 控制台端口号,可在浏览器中通过控制台来执行 rabbitmq 的相关操作
- 5672 表示 rabbitmq 监听的tcp 端口号,应用程序可以通过该端口号与 rabbitmq 建立 tcp 连接,并完成后续的异步消息通信
此外,启动时还有两个环境变量
- rabbitmq_default_user : 设置控制台默认用户名, 默认为 guest
- rabbitmq_default_pass: 设置控制台默认密码,默认为 guest
rabbitmq 控制台
rabbitmq 容器启动完毕后,打开浏览器,并在地址栏中输入 http://localhost:15672/
,并且输入登录的用户名和密码,就可以看到控制台如下所示
在上面管理界面中,包含 6 个功能菜单
- overview: 用于查看 rabbitmq 基本信息
- connections: 用于查看 rabbitmq 客户端的连接信息
- channels: 用于查看 rabbitmq 的通道
- exchanges:用于查看 rabbitmq 的交换机
- queues: 用于查看 rabbitmq 的队列
- admin: 用于管理 rabbitmq 的用户,虚拟主机,策略等数据
exchange 和 queue
rabbitmq 只有 queue, 没有 topic,因为可通过 exchange 与 queue 的组合来实现 topic 所具备的功能。rabbitmq 的消息模型如下图所示
在 exchange 和 queue 间有一个 binding 关系,当消息从 producer 发送到 exchange 中时,会根据 binding 来路由消息的去向。
- 如果 binding 各不相同,那么该消息将路由到其中一个 queue 中,随后将被一个 consumer 所消费,此时实现了 "点对点"的消息通信模型。
- 如果 binding 完全相同,那么该消息就会路由到每个 queue 中,随后将会被每个 consumer 消费,这样就实现了 “发布与订阅” 的消息通信模型
因此可将 binding 理解为 exchange 到 queue 的路由规则,这些规则可通过 rabbitmq 所提供的客户端 api 来控制,也可通过 rabbitmq 提供的控制台来管理。
rabbitmq 提供了一个默认的 exchange(amqp default),在控制台的 exchange 菜单中就可以看到它,简单情况下,只需要使用默认的 exchange 即可,当需要提供发布与订阅功能时才会使用自定义的 exchange。
开发服务端和客户端
下面我们就将 spring boot 与 rabbitmq 进行整合,先开发一个服务端作为消息的消费者,再开发一个客户端作为消息的生产者,随后运行客户端,并查看服务端中接收到的消息。
开发服务端
创建一个名为 rabbitmq-hello-server 的 maven 项目或者 spring starter project, 在 pom.xml 文件中添加下面 maven 依赖
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.19.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> </dependencies>
spring boot 框架中已经添加了对 rabbitmq 的支持,只需要依赖 spring-boot-starter-amqp
就可以启动 rabbitmq,此时还需要在 application.properties
配置文件中添加 rabbitmq 的相关配置项
spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
接下来创建 helloserver 类,封装服务端代码
package demo.msa.rabbitmq; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.stereotype.component; @component public class helloserver { @rabbitlistener(queues = "hello-queue") public void receive(string message) { system.out.println(message); } }
只需要在 receive()
方法上定义 @rabbitlistener
,并且设置 queues
参数来指定消费者需要监听的的队列名称。
最后,编写一个 spring boot 应用程序启动类来启动服务器
package demo.msa.rabbitmq; import org.springframework.amqp.core.queue; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; @springbootapplication public class rabbitmqhelloserverapplication { @bean public queue helloqueue() { return new queue("hello-queue"); } public static void main(string[] args) { springapplication.run(rabbitmqhelloserverapplication.class, args); } }
在 rabbitmq 中,必须通过程序来显式创建队列。服务端启动完毕后,将持续监听 rabbitmq 的 hello-queue 队列中即将到来的消息,该消息由客户端来发送。
开发客户端
创建一个名为 rabbitmq-hello-client 的 maven 项目或者 spring starter project, pom 中的依赖与服务端一致。客户端的 application.properties 文件与服务端一致。
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.19.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> </dependencies>
接下来创建一个名为 helloclient 的类,将其作为客户端
package demo.msa.rabbitmq; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; @component public class helloclient { @autowired private rabbittemplate rabbitmqtemplate; public void send(string message) { rabbitmqtemplate.convertandsend("hello-queue", message); } }
最后编写 spring boot 应用程序启动类来启动客户端
package demo.msa.rabbitmq; import javax.annotation.postconstruct; import org.springframework.amqp.core.queue; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; @springbootapplication public class rabbitmqhelloclientapplication { @autowired private helloclient helloclient; @bean public queue helloqueue() { return new queue("hello-queue"); } @postconstruct public void init() { helloclient.send("hello world!"); } public static void main(string[] args) { springapplication.run(rabbitmqhelloclientapplication.class, args).close(); } }
与服务端一样,此处使用 @bean
注解的 helloqueue()
方法创建一个名为 hello-queue
的队列,这样可以保证当客户端在服务端之前启动时,也能创建所需的队列。而且 rabbitmq 可以确保不会创建同名的队列,因此可分别在服务端与客户端创建同名的队列。
运行 main 方法可以启动客户端应用程序,此时将在服务端看到客户端发送的消息,也可以在 rabbitmq 控制台中看到消息队列当前的状态。
java bean 类型传输
上面发送和接收的消息只是 string 类型,如果发送的消息是一个普通的 java bean 类型,应该如何调用呢?
java bean 类型则必须实现 serializable
序列化接口才能正常调用,这是因为 rabbitmq 所传送的消息是 byte[]
类型,当客户端发送消息需要进行序列化(也就是讲 java 类型转换为 byte[] 类型),当服务端接收消息前需要先反序列化,因此发送和接收的消息对象必须实现 jdk 的序列化接口。
除了这种序列化方式外,我们也可以使用 jackson 来实现,而且 rabbitmq 已经为我们提供了 jackson 序列化的方式,这种方式更加高效。所需要做的是定义一个 jackson2jsonmessageconverter
的 spring bean。
@bean public jackson2jsonmessageconverter messageconverter() { return new jackson2jsonmessageconverter(); }
结语
rabbitmq 的性能非常高效和稳定,也能非常方便的与 spring boot 应用程序集成,还拥有非常丰富的官方文档和控制台,因此选择 rabbitmq 作为服务之间的异步消息调用平台,将成为整个微服务架构中的 "消息中心"。
参考
- 《架构探险—轻量级微服务架构》
推荐阅读