Springboot 集成activemq 点对点
程序员文章站
2022-04-30 19:47:12
...
Springboot集成activemq
点对点
目前只适用于字符串,对象,map集合可以正常使用
引入依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
application.yml
spring:
activemq:
#ActiveMQ通讯地址
broker-url: tcp://localhost:61616
#用户名
user: admin
#密码
password: admin
#是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
in-memory: false
packages:
#信任所有的包
trust-all: true
pool:
#是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
enabled: false
jms:
pub-sub-domain: true
启动类:
package com.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms //启动消息队列
public class ActivemqdemoApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqdemoApplication.class, args);
}
}
实体类:
package com.activemq.bean;
import java.io.Serializable;
/**
* Created by MengMo on 2019/12/6.
*/
public class Users implements Serializable{//实现序列化
private Integer uid;
private String uname;
private String pwd;
@Override
public String toString() {
return "Users{" +
"uid=" + uid +
", uname='" + uname + '\'' +
", pwd='" + pwd + '\'' +
'}';
}
public Users() {
}
public Users(Integer uid, String uname, String pwd) {
this.uid = uid;
this.uname = uname;
this.pwd = pwd;
}
public Integer getUid() {
return uid;
}
public void setUid(Integer uid) {
this.uid = uid;
}
public String getUname() {
return uname;
}
public void setUname(String uname) {
this.uname = uname;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}
生产者工具类:
package com.activemq.controller;
import com.activemq.bean.Users;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Service;
import javax.jms.*;
@Service
public class ProducerUtil {
/**
* 传递对象
* @param users
*/
public void sendObject(Object obj){
try {
//创建连接工厂,三个参数分别是用户名、密码以及消息队列所在地址
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//连接到JMS提供者
Connection conn = connFactory.createConnection();
//开启连接
conn.start();
//事务性会话,自动确认消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的目的地,创建队列"queue"
Destination destination = session.createQueue("queue");
//消息生产者
MessageProducer producer = session.createProducer(destination);
//键值对消息
MapMessage mapMessage = session.createMapMessage();
//转成json字符串
String s = JSONObject.toJSONString(obj);
//将消息内容放入到消息里
mapMessage.setString("object", s);
//生产者传送消息
producer.send(mapMessage);
session.commit(); //提交会话,该条消息会进入"queue"队列,生产者也完成了历史使命
producer.close();
session.close();
conn.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
消费者
package com.activemq.controller;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.util.Date;
/**
* Created by MengMo on 2019/12/6.
*/
@Component
public class ComsumerUtil implements ApplicationRunner{
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
into();
}
public void into() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//与生产者的消息目的地相同
Destination dest = session.createQueue("queue");
MessageConsumer messConsumer = session.createConsumer(dest);
//方式1设置消息监听
messConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage m = (MapMessage)message;
Thread.sleep(2000);
System.out.println("consumer接收到"+m.getString("object"));
}catch (Exception e){
e.printStackTrace();
}
}
});
}
}
控制层:
package com.activemq.controller;
import com.activemq.bean.Users;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by MengMo on 2019/12/6.
*/
@RestController
public class ActiveMqController {
@Autowired
private ProducerUtil producer;
@Autowired
private TopicProducter topicProducter;
/**
* 对象
* @param msg
*/
@RequestMapping("/sendobject")
public void sendobject(Users msg){
Users jack = new Users(1, "jack", "123");
producer.sendObject(jack);
}
/**
* 字符串
* @param msg
*/
@RequestMapping("/sendstring")
public void sendstring(String msg){
producer.sendObject("hello");
}
/**
* map集合
* @param userMap
*/
@RequestMapping("/sendlist")
public void sendlist(Map<String,Object> userMap){
List<Users> usersList = new ArrayList<>();
for (int i = 1; i <5 ; i++) {
Users user = new Users(i, "jack"+i, "123"+i);
usersList.add(user);
}
HashMap<String, Object> map = new HashMap<>();
map.put("usersList",usersList);
producer.sendObject(map);
}
/**
*发布订阅
* @param msg
* @return
*/
/*@RequestMapping("/publish")
public String publish(String msg){
topicProducter.sendMessage(msg);
return "消息已发送"+msg;
}*/
}
发布订阅
生产者
package com.activemq.controller;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;
/**
*订阅模式的生产者
**/
@Configuration
public class TopicProducter {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* @Description 将接受到的消息及消息模式(topic或queue)放到队列里面,然后消费
* 者只需要正确的添加注解@JmsListener(destination = "目的地"),监听队列消息就会主动获取
* @Param destination 目的地
* @Param msg 消息
* @Date 2019/3/21 14:46
*/
public void sendMessage(Object msg) {
ActiveMQTopic destination = new ActiveMQTopic("topic-my");
//将传过来的转换为JSON字符串传入mq中
String s = JSONObject.toJSONString(msg);
//发送至activemq
jmsMessagingTemplate.convertAndSend(destination, s);
}
}
消费者
package com.activemq.controller;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 订阅模式消费者
*/
@Component
public class TopicCustomer {
/**
* 创建2个消费者
* @param text
*/
//监听mq发过来的消息
@JmsListener(destination = "topic-my")
public void subscriber(String text) throws InterruptedException {
//守护线程休眠时间设置单位为秒
Thread.sleep(2000);
System.out.println("消费者1111111111111111111111消费+"+text);
}
@JmsListener(destination = "topic-my")
public void subscriber1(String text) throws InterruptedException {
Thread.sleep(2000);
System.out.println("消费者2222222222222222222222消费+"+text);
}
}
在这里为了测试一下我只用了一个类进行操作,如果想要使用多个只需要再创建一个即可。
希望可以帮助到您,大神勿喷,谢谢!!!