Springboot简单集成ActiveMQ
消息发送者的实现
pom.xml添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
测试代码
@Autowired
JmsMessagingTemplate template;
@Test
public void contextLoads() {
template.convertAndSend("tym_002","hello_02");
}
消息接受者的实现
pom文件依赖和发送者一样
创建JmsListener
int a=0;
@JmsListener(destination = "tym_002",containerFactory = "jmsQueryListenerFactory")
public void testreceiveQueue(TextMessage textMessage, Session session) throws JMSException {
try {
int i=1/0;
System.out.println("接受到:"+textMessage.getText());
textMessage.acknowledge();
} catch (Exception e) {
a++;
System.out.println("重试了 :"+a);
session.recover();
}
}
其中containerFactory的赋值为自定义的配置类:
@Configuration
public class ActiveMqConfig {
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setInitialRedeliveryDelay(1000L);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory(@Autowired RedeliveryPolicy redeliveryPolicy,@Value("${spring.activemq.broker-url}") String url){
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate getJmsTemplate(@Autowired ActiveMQConnectionFactory factory){
JmsTemplate jmsTemplate=new JmsTemplate();
jmsTemplate.setConnectionFactory(factory);
return jmsTemplate;
}
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ActiveMQConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
factory.setConcurrency("1-10");
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
其中factory.setSessionAcknowledgeMode(4)设置4是手动接受消息,防止在接受自动消息时,抛出异常导致消息无效。