欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Springboot简单集成ActiveMQ

程序员文章站 2022-03-02 18:14:49
...

Springboot简单集成ActiveMQ

消息发送者的实现

pom.xml添加依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
        <groupId>org.messaginghub</groupId>
        <artifactId>pooled-jms</artifactId>
</dependency>

测试代码

@Autowired
JmsMessagingTemplate template;
@Test
public void contextLoads() {
    template.convertAndSend("tym_002","hello_02");
}

消息接受者的实现

pom文件依赖和发送者一样

创建JmsListener

int a=0;
@JmsListener(destination = "tym_002",containerFactory = "jmsQueryListenerFactory")
public void testreceiveQueue(TextMessage textMessage, Session session) throws JMSException {
    try {
        int i=1/0;
        System.out.println("接受到:"+textMessage.getText());
        textMessage.acknowledge();
    } catch (Exception e) {
        a++;
        System.out.println("重试了 :"+a);
        session.recover();
    }
}

其中containerFactory的赋值为自定义的配置类:

@Configuration
public class ActiveMqConfig {
@Bean
public RedeliveryPolicy redeliveryPolicy(){
    RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    redeliveryPolicy.setInitialRedeliveryDelay(1000L);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setMaximumRedeliveryDelay(-1);
    return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory(@Autowired RedeliveryPolicy redeliveryPolicy,@Value("${spring.activemq.broker-url}") String url){
    ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(url);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    return activeMQConnectionFactory;
}
@Bean
public JmsTemplate getJmsTemplate(@Autowired ActiveMQConnectionFactory factory){
    JmsTemplate jmsTemplate=new JmsTemplate();
    jmsTemplate.setConnectionFactory(factory);
    return jmsTemplate;
}
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ActiveMQConnectionFactory activeMQConnectionFactory){
    DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory);
    factory.setConcurrency("1-10");
    factory.setRecoveryInterval(1000L);
    factory.setSessionAcknowledgeMode(4);
    return factory;
}
}

其中factory.setSessionAcknowledgeMode(4)设置4是手动接受消息,防止在接受自动消息时,抛出异常导致消息无效。