SpringBoot2.0整合ActiveMQ
程序员文章站
2022-04-30 19:45:00
...
【1】pom文件
添加ActiveMQ依赖:
<!--整合ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
【2】yml配置
yml配置文件如下:
spring:
activemq:
user: root
password: 123456
broker-url: tcp://127.0.0.1:61616
pool:
enabled: true
max-connections: 50
packages:
trust-all: true
【3】生产者消费者队列
主程序类配置如下:
@SpringBootApplication
@EnableJms //ActiveMQ
public class HhProvinceApplication {
public static void main(String[] args) {
SpringApplication.run(HhProvinceApplication.class, args);
}
}
MyActiveMQConfig如下:
@Configuration
public class MyActiveMQConfig {
@Bean
public Queue logQueue() {
return new ActiveMQQueue("app.log");
}
}
生产者示例:
@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {
private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue logQueue;
/* 日志插入 */
public void insertVisitLog(SysVisitLog sysVisitLog) {
log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitLog);
jmsMessagingTemplate.convertAndSend(logQueue, sysVisitLog);
}
}
消费者示例:
@Service
public class ConsumerListener {
private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
@Autowired
SysVisitLogMapper visitLogMapper;
@JmsListener(destination="app.log")
@Transactional(rollbackFor={Exception.class})
public void insertVisitLog(SysVisitLog sysVisitLog){
int i = visitLogMapper.insertSelective(sysVisitLog);
log.info("消费者插入日志成功 i:"+i+"--sysVisitLog : "+sysVisitLog);
}
}
【4】以前SSM下使用ActiveMQ
以前在SSM(SpringMVC Spring MyBatis)下主要使用xml对ActiveMQ进行配置,代码中生产者和消费者同样使用注解。
ActiveMQ xml配置如下:
<!--这个是队列目的地,点对点的-->
<bean id="InsertVisitLogQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>InsertVisitLogQueue</value>
</constructor-arg>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
<property name="userName" value="root" />
<property name="password" value="123456" />
<property name="useAsyncSend" value="true" />
<property name="trustAllPackages" value="true"/>
</bean>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="receiveTimeout" value="2000" />
</bean>
<!-- 支持@JmsListener自动启动监听器 -->
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
对比SpringBoot2.0,可以发现简化了很多配置。
生产者示例:
@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {
private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);
@Autowired
private JmsTemplate jmsTemplate;
@Qualifier("InsertVisitLogQueue")
@Autowired
private Destination destinationInsertVisitLogQueue;
/* 插入tb_sys_visit_log */
@Override
public void insertVisitLog(SysVisitModel sysVisitModel) {
log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitModel);
new Thread(new Runnable(){
@Override
public void run() {
jmsTemplate.send(destinationInsertVisitLogQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
msg.setObject(sysVisitModel);
/* 一分钟后插入访问日志 */
long delay = 60 * 1000;
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
return msg;
}
});
}}).start();
}
}
消费者示例:
@Service
public class ConsumerListener {
@JmsListener(destination="InsertVisitLogQueue",concurrency="10-20")
@Transactional(rollbackFor={Exception.class})
public void insertVisitLog(SysVisitModel sysVisitModel){
log.info("消费者获取到的sysVisitModel : "+sysVisitModel+Thread.currentThread().getName());
//...
}
}
pom依赖:
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
ActiveMQ安装:https://blog.csdn.net/j080624/article/category/7358806