springBoot集成ActiveMQ
程序员文章站
2022-04-30 19:45:54
...
springBoot集成ActiveMQ
1. 修改pom文件,引入springboot-active依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
2. 修改application.yml文件
增加:
spring:
activemq:
user: admin
password: admin
broker-url: tcp://192.168.2.113:61616
pool:
enabled: true
max-connections: 10
queueName: publish.queue
topicName: publish.topic
3. 增加文件包以及配置文件ActiveMQConfig
@Configuration
public class ActiveMQConfig {
@Value("${queueName}")
private String queueName;
@Value("${topicName}")
private String topicName;
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
注意:这边类名的引入都是org.apache.activemq.XXX进行引入。
4. 测试
4.1 controller
@RestController
@RequestMapping("/publish")
public class PublishController {
@Autowired
private JmsMessagingTemplate jms;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@RequestMapping("/queue")
public String queue(){
for (int i = 0; i < 10 ; i++){
jms.convertAndSend(queue, "queue"+i);
}
return "queue 发送成功";
}
@JmsListener(destination = "out.queue")
public void consumerMsg(String msg){
System.out.println(msg);
}
@RequestMapping("/topic")
public String topic(){
for (int i = 0; i < 10 ; i++){
jms.convertAndSend(topic, "topic"+i);
}
return "topic 发送成功";
}
}
修改队列的关键字jms.convertAndSend(new ActiveMQQueue(“test.queue”), “queue”+i);
new ActiveMQQueue(“test.queue”) 自定义queue的名称
然后在QueueListener中以@JmsListener(destination = “test.queue”, containerFactory = “jmsListenerContainerQueue”)进行获取
4.2 listener
@Component
public class QueueListener {
@JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
@SendTo("out.queue")
public String receive(String text){
System.out.println("QueueListener: consumer-a 收到一条信息: " + text);
return "consumer-a received : " + text;
}
}
默认destination = “publish.queue” 为application中定义的 queueName 的值