欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot2.0整合ActiveMQ

程序员文章站 2022-04-30 19:45:00
...

【1】pom文件

添加ActiveMQ依赖:

<!--整合ActiveMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

【2】yml配置

yml配置文件如下:

spring:
    activemq:
      user: root
      password: 123456
      broker-url: tcp://127.0.0.1:61616
      pool:
        enabled: true
        max-connections: 50
      packages:
        trust-all: true

【3】生产者消费者队列

主程序类配置如下:

@SpringBootApplication
@EnableJms //ActiveMQ
public class HhProvinceApplication {

    public static void main(String[] args) {
        SpringApplication.run(HhProvinceApplication.class, args);
    }
}

MyActiveMQConfig如下:

@Configuration
public class MyActiveMQConfig {

    @Bean
    public Queue logQueue() {
        return new ActiveMQQueue("app.log");
    }
}

生产者示例:

@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {

    private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue logQueue;


    /* 日志插入 */
    public void insertVisitLog(SysVisitLog sysVisitLog) {
        log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitLog);
        jmsMessagingTemplate.convertAndSend(logQueue, sysVisitLog);
    }
}

消费者示例:

@Service
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);  

    @Autowired
    SysVisitLogMapper visitLogMapper;

    @JmsListener(destination="app.log")
    @Transactional(rollbackFor={Exception.class})
    public void insertVisitLog(SysVisitLog sysVisitLog){
        int i = visitLogMapper.insertSelective(sysVisitLog);
        log.info("消费者插入日志成功 i:"+i+"--sysVisitLog : "+sysVisitLog);
    }
}

【4】以前SSM下使用ActiveMQ

以前在SSM(SpringMVC Spring MyBatis)下主要使用xml对ActiveMQ进行配置,代码中生产者和消费者同样使用注解。

ActiveMQ xml配置如下:

  <!--这个是队列目的地,点对点的-->
    <bean id="InsertVisitLogQueue" class="org.apache.activemq.command.ActiveMQQueue">
         <constructor-arg>
             <value>InsertVisitLogQueue</value>
         </constructor-arg>
    </bean> 

    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">    
        <property name="connectionFactory">    
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
            <property name="brokerURL" value="tcp://127.0.0.1:61616" />
            <property name="userName" value="root" />
            <property name="password" value="123456" />
            <property name="useAsyncSend" value="true" /> 
            <property name="trustAllPackages" value="true"/>
            </bean>    
        </property> 
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="receiveTimeout" value="2000" />
    </bean>

    <!-- 支持@JmsListener自动启动监听器 -->
    <jms:annotation-driven/>

    <bean id="jmsListenerContainerFactory"  class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
         <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

对比SpringBoot2.0,可以发现简化了很多配置。


生产者示例:

@Service
public class SysVisitLogServiceImpl implements ISysVisitLogService {

    private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Qualifier("InsertVisitLogQueue")
    @Autowired
    private Destination destinationInsertVisitLogQueue;

    /* 插入tb_sys_visit_log */
    @Override
    public void insertVisitLog(SysVisitModel sysVisitModel) {
        log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitModel);
        new Thread(new Runnable(){
            @Override
            public void run() {
                jmsTemplate.send(destinationInsertVisitLogQueue, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); 
                        msg.setObject(sysVisitModel);
                        /* 一分钟后插入访问日志 */
                        long delay = 60 * 1000;
                        msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                        return msg;   
                    } 
                }); 

            }}).start();
    }

}

消费者示例:

@Service
public class ConsumerListener {

    @JmsListener(destination="InsertVisitLogQueue",concurrency="10-20")
    @Transactional(rollbackFor={Exception.class})
    public void insertVisitLog(SysVisitModel sysVisitModel){
        log.info("消费者获取到的sysVisitModel : "+sysVisitModel+Thread.currentThread().getName());
        //...
        }
}

pom依赖:

<!-- activeMQ -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

ActiveMQ安装:https://blog.csdn.net/j080624/article/category/7358806