简单封装spring-rabbit实现mq组件化
程序员文章站
2022-07-13 09:09:49
...
网上有关spring和rabbitmq整合的博文比比皆是,但是都没有形成整体解决方案,接下来我会通过对spring-rabbit的简单封装实现消息队列服务的组件化。
0、添加所需依赖jar
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.6.RELEASE</version> </dependency>
1、创建rabbit生产者工具类
package com.huatech.mq.producer; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; /** * 用于提供Rabbit的操作类 * @since 2017-7-31 * @author lh * */ public class RabbitProducer { /** * 默认交换机名称 */ public static String DEFAULT_EXCHANGE_NAME = "rd-mq-exchange"; private RabbitTemplate rabbitTemplate; /** * * @param rabbitTemplate */ public RabbitProducer(){ } /** * * @param routingKey * @param message */ public <T> void send(String routingKey, T message) { send(DEFAULT_EXCHANGE_NAME, routingKey, message); } /** * * @param exchange * @param routingKey * @param message */ public <T> void send(String exchange,String routingKey, T message) { //实现将message通过json转换&将对象发送 rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { //实现message操作处理实现 @Override public Message postProcessMessage(Message message) throws AmqpException { //设置信息的属性信息&设置发送模式(PERSISTENT:连续的) MessageProperties mp = message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); mp.setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } }, new CorrelationData(String.valueOf(message))); } public RabbitTemplate getRabbitTemplate() { return rabbitTemplate; } public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } }
2、在spring中配置生产者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd"> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 创建connectionFactory --> <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory --> <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" /> <rabbit:connection-factory id="connectionFactory" addresses="${rabbit.addresses}" username="${rabbit.username}" password="${rabbit.password}" executor="mqExecutor"/> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明--> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" /> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange id="rd-mq-exchange" name="rd-mq-exchange" durable="true" auto-delete="false" > <rabbit:bindings> <rabbit:binding queue="log_queue" key="log_queue_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 生产者配置:spring template声明 --> <!-- org.springframework.amqp.rabbit.core.RabbitTemplate channel-transacted true,随着spring事务进行提交或者回滚 --> <rabbit:template id="rabbitTemplate" exchange="rd-mq-exchange" connection-factory="connectionFactory" message-converter="jsonMessageConverter" retry-template="retryTemplate" /> <!-- 重试机制 --> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="3" /> <property name="maxInterval" value="3000" /> </bean> </property> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="1" /> </bean> </property> </bean> <!-- 生产者工具类 --> <bean id="rabbitProducer" class="com.huatech.mq.producer.RabbitProducer"> <property name="rabbitTemplate" ref="rabbitTemplate"/> </bean> </beans>
3、创建消息基类
package com.huatech.mq.model; /** * MQ基类 * @version 3.1 * @author lh * @date 2017年7月31日 */ public class MqBaseModel { /** * 操作 */ protected String operate; /** * 无参构造方法 */ public MqBaseModel() { super(); } public String getOperate() { return operate; } public void setOperate(String operate) { this.operate = operate; } }
4、添加具体model(MqLogModel)
package com.huatech.mq.model; /** * 日志model * @author lh * */ public class MqLogModel extends MqBaseModel { private String info; private String logTime; public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public String getLogTime() { return logTime; } public void setLogTime(String logTime) { this.logTime = logTime; } @Override public String toString() { return "MqLogModel [logTime=" + logTime + "]"; } }
5、添加消息队列监听接口
package com.huatech.mq.listener; /** * 消息队列监听接口 * @author lh * @version 3.1 * @since 2017-7-31 * @param <T> */ public interface MqListener<T> { void listen(T t); }
6、日志队列实现该接口
package com.huatech.mq.listener; import org.springframework.stereotype.Component; import com.huatech.mq.model.MqLogModel; /** * 日志队列监听 * @author lh * @version 3.0 * @since 2017-8-2 * */ @Component public class LogQueueListener implements MqListener<MqLogModel> { //private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class); @Override public void listen(MqLogModel t) { System.out.println(String.format("logInfo: %s, logTime:%s", t.getInfo(), t.getLogTime())); } }
7、在spring中配置消费者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd"> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 创建connectionFactory --> <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory --> <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" /> <rabbit:connection-factory id="connectionFactory" addresses="${rabbit.addresses}" username="${rabbit.username}" password="${rabbit.password}" executor="mqExecutor"/> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明--> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" /> <!-- 消费者监听容器 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false" concurrency="10" message-converter="jsonMessageConverter" > <rabbit:listener ref="logQueueListener" method="listen" queues="log_queue" /> </rabbit:listener-container> </beans>
8、spring扫包
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <description>Spring Configuration</description> <!-- 加载配置属性文件 --> <context:property-placeholder ignore-unresolvable="true" location="classpath:rabbmitmq.properties" /> <!-- 扫描相关的bean --> <context:component-scan base-package="com.huatech.mq"/> </beans>
9、rabbmitmq.properties
#============== rabbitmq config ==================== rabbit.addresses=127.0.0.1:5672 rabbit.username=guest rabbit.password=guest
10、测试用例
package com.huatech.mq; import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.huatech.mq.model.MqLogModel; import com.huatech.mq.producer.RabbitProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:spring-rabbit-producer.xml","classpath:spring-rabbit-consumer.xml" }) public class LogQueueTest { @Autowired private RabbitProducer rabbitProducer; @Test public void test(){ for (int i = 0; i < 10; i++) { MqLogModel model = new MqLogModel(); model.setInfo("hello rabbitmq "+i); model.setLogTime(new Date().toString()); rabbitProducer.send("log_queue_key", model); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } }
参考:Spring AMQP
上一篇: Exception与Error的区别?
下一篇: WEB安全测试要点总结