Spring整合RabbitMQ实战配置
程序员文章站
2022-07-12 12:29:46
...
本文只讲述spring整合rabbitmq的配置与代码实现,没有安装rabbitmq的教程
配置
通用配置
rabbitmq 的地址配置,admin产生 一个mq的连接工厂
<!-- 这里配置rabbit的地址,用户,密码,和vhost 我这里是从properties中取参数 -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${rabbit.address}"/>
<property name="username" value="${rabbit.username}"/>
<property name="password" value="${rabbit.password}"/>
<property name="virtualHost" value="${rabbit.virtualHost}"/>
</bean>
<!-- 配置一个admin 多个admin ,启用不同的id即可 -->
<rabbit:admin id="admin" connection-factory="connectionFactory"/>
生产端配置
这里主要是配置一下队列的名称,绑在在哪个exchange上面,指定一个消息的转换器(重要),然后生成一个template,在需要发送消息的类中,注入这个template就可以发送消息到rabbitmq中了
<!-- 配置队列的名称,以及队列的一些属性 -->
<rabbit:queue id="${rabbit.queueName}" name="${rabbit.queueName}" durable="true"
auto-delete="false" exclusive="false" declared-by="admin" />
<!-- 配置一个交换器 ,一个交换器可以绑定多个队列 队列旁边的key需要在发送的时候用到-->
<rabbit:direct-exchange id="exchange" name="xchange" durable="true" auto-delete="false" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="${rabbit.queueName}" key="queueKey" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息转换器,这里我用了自定义的消息转换,调用FastJson做序列化处理 -->
<bean id="fastJsonConverter" class="com.bat.util.FastJsonConverter"></bean>
<!-- 生成template,把交换器,连接工厂,消息转换器设置进去,就可以直接使用了 -->
<rabbit:template exchange="exchange" id="template" connection-factory="connectionFactory" message-converter="fastJsonConverter" />
消费端配置
消费端的配置比较简单,定义一个类实现MessageListener,重写里面的onMessage方法
<!-- 这里用了配置的方法,可以用注解或者其他的方法 -->
<bean id="messageReceiver" class="com.bat.service.impl.IMessageReceiver"></bean>
<!-- rabbit的监听,同一个连接的监听配置可以写在一起 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="${rabbit.queueName}" ref="messageReceiver"/>
</rabbit:listener-container>
其他
多个消息源就配置多个链接,admin,exchange,即可,配置的方式雷同
代码
生成端
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class Producer {
@Autowired
private AmqpTemplate template ;
// 这里的key是在xml队列对应的key
@Override
public void send(Object entity) {
template .convertAndSend("queueKey", entity);
}
}
消费端
这里刚开始没有配置消息转换器,转json的时候一直报错,后来用了自定义的json消息转换器后,body转string后可以正常转换成json了
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class Producer implements MessageListener {
@Override
public void onMessage(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
// pase json and do something
}
}
推荐阅读
-
Spring+Spring MVC+Mybatis 框架整合开发(半注解半配置文件)
-
详解Spring Boot配置使用Logback进行日志记录的实战
-
Spring实战之XML与JavaConfig的混合配置详解
-
Spring-Data-JPA整合MySQL和配置的方法
-
Spring整合struts的配置文件存放问题
-
Spring Boot外部化配置实战解析
-
Spring Boot整合RabbitMQ开发实战详解
-
Spring 配置RabbitMQ 处理异常ErrorHandler
-
SpringCloud实战(六)-高可用的分布式配置中心(Spring Cloud Config)
-
Spring Cloud入门实战(五)------配置中心