https://blog.csdn.net/qincidong/article/details/76114434
SpringBoot集成activeMQ
1.添加依赖:
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
2.在application.properties中加入activemq的配置
spring.activemq.broker-url=tcp://192.168.74.135:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring.activemq.pool.expiry-timeout=10000
spring.activemq.pool.idle-timeout=30000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
3.创建一个消息生产者
@Component
public class JMSProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination,String message) {
this.jmsTemplate.convertAndSend(destination,message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
4.创建一个消息消费者
@Component
public class JMSConsumer {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
@JmsListener(destination = "springboot.queue.test")
public void receiveQueue(String msg) {
logger.info("接收到消息:{}",msg);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
5.测试类
public class JmsTest extends BaseTest{
@Autowired
private JMSProducer jmsProducer;
@Test
public void testJms() {
Destination destination = new ActiveMQQueue("springboot.queue.test");
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(destination,"hello,world!" + i);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
BaseTest代码如下:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = com.sample.activity.web.Application.class)
public abstract class BaseTest {
}
- 1
- 2
- 3
- 4
6.发送和接收TOPIC消息
默认只能发送和接收queue消息,如果要发送和接收topic消息,需要在application.properties文件中加入:
spring.jms.pub-sub-domain=true
- 1
发送和接收的代码同queue一样。
但是这样有另外一个问题:无法发送和接收queue消息。那么如何同时支持发送和接收queue/topic消息呢?
7.支持同时发送和接收queue/topic
i. 新建一个JMS的配置类:
@Configuration
public class JmsConfig {
public final static String TOPIC = "springboot.topic.test";
public final static String QUEUE = "springboot.queue.test";
@Bean
public Queue queue() {
return new ActiveMQQueue(QUEUE);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPIC);
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
ii. 消息消费者的代码改成如下:
@Component
public class JMSConsumer {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
@JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
public void onTopicMessage(String msg) {
logger.info("接收到topic消息:{}",msg);
}
@JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
public void onQueueMessage(String msg) {
logger.info("接收到queue消息:{}",msg);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
可以看到,这里指定了ConnectionFactory。
iii. 测试类:
public class JmsTest extends BaseTest{
@Autowired
private JMSProducer jmsProducer;
@Autowired
private Topic topic;
@Autowired
private Queue queue;
@Test
public void testJms() {
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(queue,"queue,world!" + i);
jmsProducer.sendMessage(topic, "topic,world!" + i);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
springboot中activemq的一些配置属性参考:springboot activemq配置属性