详解spring boot集成RabbitMQ
rabbitmq作为amqp的代表性产品,在项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。
首先正确的安装rabbitmq及运行正常。
rabbitmq需啊erlang环境,所以首先安装对应版本的erlang,可在rabbitmq官网下载
# rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
使用yum安装rabbitmq,避免缺少依赖包引起的安装失败
# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
启动rabbitmq
# /sbin/service rabbitmq-server start
由于rabbitmq默认提供的guest用户只能本地访问,所以额外创建用户用于测试
# /sbin/rabbitmqctl add_user test test123 用户名:test,密码:test123
开启web管理插件
# rabbitmq-plugins enable rabbitmq_management
并使用之前创建的用户登录,并设置该用户为administrator,虚拟主机地址为/
spring boot 引入相关依赖
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> </dependencies>
消息生产者
application.properties添加一下配置
spring.rabbitmq.host=192.168.1.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test123 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
spring boot配置类,作用为指定队列,交换器类型及绑定操作
import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.queue; import org.springframework.amqp.core.topicexchange; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class rabbitconfig { //声明队列 @bean public queue queue1() { return new queue("hello.queue1", true); // true表示持久化该队列 } @bean public queue queue2() { return new queue("hello.queue2", true); } //声明交互器 @bean topicexchange topicexchange() { return new topicexchange("topicexchange"); } //绑定 @bean public binding binding1() { return bindingbuilder.bind(queue1()).to(topicexchange()).with("key.1"); } @bean public binding binding2() { return bindingbuilder.bind(queue2()).to(topicexchange()).with("key.#"); } }
共声明了2个队列,分别是hello.queue1,hello.queue2,交换器类型为topicexchange,并与hello.queue1,hello.queue2队列分别绑定。
生产者类
import java.util.uuid; import javax.annotation.postconstruct; import org.springframework.amqp.core.message; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.amqp.rabbit.core.rabbittemplate.returncallback; import org.springframework.amqp.rabbit.support.correlationdata; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; @component public class sender implements rabbittemplate.confirmcallback, returncallback { @autowired private rabbittemplate rabbittemplate; @postconstruct public void init() { rabbittemplate.setconfirmcallback(this); rabbittemplate.setreturncallback(this); } @override public void confirm(correlationdata correlationdata, boolean ack, string cause) { if (ack) { system.out.println("消息发送成功:" + correlationdata); } else { system.out.println("消息发送失败:" + cause); } } @override public void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) { system.out.println(message.getmessageproperties().getcorrelationidstring() + " 发送失败"); } //发送消息,不需要实现任何接口,供外部调用。 public void send(string msg){ correlationdata correlationid = new correlationdata(uuid.randomuuid().tostring()); system.out.println("开始发送消息 : " + msg.tolowercase()); string response = rabbittemplate.convertsendandreceive("topicexchange", "key.1", msg, correlationid).tostring(); system.out.println("结束发送消息 : " + msg.tolowercase()); system.out.println("消费者响应 : " + response + " 消息处理完成"); } }
要点:
1.注入rabbittemplate
2.实现rabbittemplate.confirmcallback, rabbittemplate.returncallback接口(非必须)。
confirmcallback接口用于实现消息发送到rabbitmq交换器后接收ack回调。returncallback接口用于实现消息发送到rabbitmq交换器,但无相应队列与交换器绑定时的回调。
3.实现消息发送方法。调用rabbittemplate相应的方法即可。rabbittemplate常用发送方法有
rabbittemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.message rabbittemplate.convertandsend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.message后发送 rabbittemplate.convertsendandreceive(message) //转换并发送消息,且等待消息者返回响应消息。
针对业务场景选择合适的消息发送方式即可。
消息消费者
application.properties添加一下配置
spring.rabbitmq.host=192.168.1.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test123 spring.rabbitmq.listener.concurrency=2 //最小消息监听线程数 spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数
消费者类
import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.stereotype.component; @component public class receiver { @rabbitlistener(queues = "hello.queue1") public string processmessage1(string msg) { system.out.println(thread.currentthread().getname() + " 接收到来自hello.queue1队列的消息:" + msg); return msg.touppercase(); } @rabbitlistener(queues = "hello.queue2") public void processmessage2(string msg) { system.out.println(thread.currentthread().getname() + " 接收到来自hello.queue2队列的消息:" + msg); } }
由于定义了2个队列,所以分别定义不同的监听器监听不同的队列。由于最小消息监听线程数和最大消息监听线程数都是2,所以每个监听器各有2个线程实现监听功能。
要点:
1.监听器参数类型与消息实际类型匹配。在生产者中发送的消息实际类型是string,所以这里监听器参数类型也是string。
2.如果监听器需要有响应返回给生产者,直接在监听方法中return即可。
运行测试
import java.util.date; import org.junit.test; import org.junit.runner.runwith; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.junit4.springjunit4classrunner; import com.sam.demo.rabbitmq.application; import com.sam.demo.rabbitmq.sender.sender; @runwith(value=springjunit4classrunner.class) @springboottest(classes = application.class) public class rabbittests { @autowired private sender sender; @test public void sendtest() throws exception { while(true){ string msg = new date().tostring(); sender.send(msg); thread.sleep(1000); } } }
输出:
开始发送消息 : wed mar 29 23:20:52 cst 2017 simpleasynctaskexecutor-1 接收到来自hello.queue2队列的消息:wed mar 29 23:20:52 cst 2017 simpleasynctaskexecutor-2 接收到来自hello.queue1队列的消息:wed mar 29 23:20:52 cst 2017 结束发送消息 : wed mar 29 23:20:52 cst 2017 消费者响应 : wed mar 29 23:20:52 cst 2017 消息处理完成 ------------------------------------------------ 消息发送成功:correlationdata [id=340d14e6-cfcc-4653-9f95-29b37d50f886] 开始发送消息 : wed mar 29 23:20:53 cst 2017 simpleasynctaskexecutor-1 接收到来自hello.queue1队列的消息:wed mar 29 23:20:53 cst 2017 simpleasynctaskexecutor-2 接收到来自hello.queue2队列的消息:wed mar 29 23:20:53 cst 2017 结束发送消息 : wed mar 29 23:20:53 cst 2017 消费者响应 : wed mar 29 23:20:53 cst 2017 消息处理完成 ------------------------------------------------ 消息发送成功:correlationdata [id=e4e01f89-d0d4-405e-80f0-85bb20238f34] 开始发送消息 : wed mar 29 23:20:54 cst 2017 simpleasynctaskexecutor-2 接收到来自hello.queue1队列的消息:wed mar 29 23:20:54 cst 2017 simpleasynctaskexecutor-1 接收到来自hello.queue2队列的消息:wed mar 29 23:20:54 cst 2017 结束发送消息 : wed mar 29 23:20:54 cst 2017 消费者响应 : wed mar 29 23:20:54 cst 2017 消息处理完成 ------------------------------------------------
如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.abstractexchange接口。
常用交换器类型如下:
direct(directexchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
topic(topicexchange):按规则转发消息(最灵活)。
headers(headersexchange):设置header attribute参数类型的交换机。
fanout(fanoutexchange):转发消息到所有绑定队列。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: SpringMVC中日期格式的转换
推荐阅读
-
详解spring boot集成RabbitMQ
-
详解用Spring Boot零配置快速创建web项目
-
详解Spring Boot Web项目之参数绑定
-
详解Spring Boot2 Webflux的全局异常处理
-
详解spring boot Websocket使用笔记
-
Spring Boot 集成MyBatis 教程详解
-
详解spring boot 使用application.properties 进行外部配置
-
Spring Boot集成Redis实现缓存机制(从零开始学Spring Boot)
-
详解Spring Boot中PATCH上传文件的问题
-
Spring Boot集成MyBatis的方法