欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring整合RabbitMQ实战配置

程序员文章站 2022-07-12 12:29:46
...

本文只讲述spring整合rabbitmq的配置与代码实现,没有安装rabbitmq的教程

配置

通用配置

rabbitmq 的地址配置,admin产生 一个mq的连接工厂

	<!-- 这里配置rabbit的地址,用户,密码,和vhost 我这里是从properties中取参数 -->
	<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<property name="addresses" value="${rabbit.address}"/>
		<property name="username" value="${rabbit.username}"/>
		<property name="password" value="${rabbit.password}"/>
		<property name="virtualHost" value="${rabbit.virtualHost}"/>
	</bean>
	<!-- 配置一个admin 多个admin ,启用不同的id即可 -->
	<rabbit:admin id="admin" connection-factory="connectionFactory"/>

生产端配置

这里主要是配置一下队列的名称,绑在在哪个exchange上面,指定一个消息的转换器(重要),然后生成一个template,在需要发送消息的类中,注入这个template就可以发送消息到rabbitmq中了

	<!-- 配置队列的名称,以及队列的一些属性 -->
	<rabbit:queue id="${rabbit.queueName}" name="${rabbit.queueName}" durable="true"
				  auto-delete="false" exclusive="false"  declared-by="admin" />
	<!-- 配置一个交换器 ,一个交换器可以绑定多个队列 队列旁边的key需要在发送的时候用到-->
	<rabbit:direct-exchange id="exchange" name="xchange" durable="true" auto-delete="false" declared-by="admin">
		<rabbit:bindings>
			<rabbit:binding queue="${rabbit.queueName}" key="queueKey" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	<!-- 消息转换器,这里我用了自定义的消息转换,调用FastJson做序列化处理 -->
	<bean id="fastJsonConverter" class="com.bat.util.FastJsonConverter"></bean>
	<!-- 生成template,把交换器,连接工厂,消息转换器设置进去,就可以直接使用了 -->
	<rabbit:template exchange="exchange" id="template" connection-factory="connectionFactory" message-converter="fastJsonConverter" />

消费端配置

消费端的配置比较简单,定义一个类实现MessageListener,重写里面的onMessage方法

	<!-- 这里用了配置的方法,可以用注解或者其他的方法 -->
	<bean id="messageReceiver" class="com.bat.service.impl.IMessageReceiver"></bean>
	<!-- rabbit的监听,同一个连接的监听配置可以写在一起 -->
	<rabbit:listener-container connection-factory="connectionFactory">
		<rabbit:listener queues="${rabbit.queueName}" ref="messageReceiver"/>
	</rabbit:listener-container>

其他

多个消息源就配置多个链接,admin,exchange,即可,配置的方式雷同

代码

生成端

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class Producer {

    @Autowired
    private AmqpTemplate template ;
	// 这里的key是在xml队列对应的key
    @Override
    public void send(Object entity) {
        template .convertAndSend("queueKey", entity);
    }
}

消费端

这里刚开始没有配置消息转换器,转json的时候一直报错,后来用了自定义的json消息转换器后,body转string后可以正常转换成json了

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class Producer implements MessageListener {
    @Override
    public void onMessage(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
		// pase json and do something
    }
}