activemq发布订阅模式消息
程序员文章站
2022-03-24 12:02:42
...
activemq发布订阅模式消息
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducter {
// 发送次数
public static final int SEND_NUM = 10;
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
/**
* 消息发送端
* @param session
* @param publisher
* @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
TextMessage textMessage = session.createTextMessage(message);
System.out.println(textMessage.getText());
//发送 Topic消息
publisher.send(textMessage);
}
}
public void run() throws Exception {
//Topic连接
TopicConnection connection = null;
//Topic会话
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducter.DEFAULT_USER, TopicProducter.DEFAULT_PASSWORD, TopicProducter.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息发送者
TopicPublisher publisher = session.createPublisher(topic);
// 设置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
new TopicProducter().run();
}
}
第一种消费消息的方式
public class TopicReceiver {
// tcp 地址 服务器器端地址
// public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver.DEFAULT_USER, TopicReceiver.DEFAULT_PASSWORD, TopicReceiver.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
final TopicSubscriber subscriber = session.createSubscriber(topic);
//接收Topic生产者发送过来的消息
//需要注意的是此处需要启动一个新的线程来处理问题
new Thread(){
public void run(){
TextMessage textMessage = null;
try {
while(true){//持续接收消息
textMessage = (TextMessage) subscriber.receive();
if(textMessage==null)
break;
System.out.println("接收#" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}.start();
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Receive这个类进入休眠状态了,而接收者.start方法刚刚启动的新线程会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}
第二种消费消息的方式:监听器
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
//使用监听器的方式订阅消息
public class TopicListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
//使用监听器的方式订阅消息
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicListener.run();
}
}
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducter {
// 发送次数
public static final int SEND_NUM = 10;
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
/**
* 消息发送端
* @param session
* @param publisher
* @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
TextMessage textMessage = session.createTextMessage(message);
System.out.println(textMessage.getText());
//发送 Topic消息
publisher.send(textMessage);
}
}
public void run() throws Exception {
//Topic连接
TopicConnection connection = null;
//Topic会话
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducter.DEFAULT_USER, TopicProducter.DEFAULT_PASSWORD, TopicProducter.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息发送者
TopicPublisher publisher = session.createPublisher(topic);
// 设置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
new TopicProducter().run();
}
}
第一种消费消息的方式
public class TopicReceiver {
// tcp 地址 服务器器端地址
// public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver.DEFAULT_USER, TopicReceiver.DEFAULT_PASSWORD, TopicReceiver.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
final TopicSubscriber subscriber = session.createSubscriber(topic);
//接收Topic生产者发送过来的消息
//需要注意的是此处需要启动一个新的线程来处理问题
new Thread(){
public void run(){
TextMessage textMessage = null;
try {
while(true){//持续接收消息
textMessage = (TextMessage) subscriber.receive();
if(textMessage==null)
break;
System.out.println("接收#" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}.start();
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Receive这个类进入休眠状态了,而接收者.start方法刚刚启动的新线程会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}
第二种消费消息的方式:监听器
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
//使用监听器的方式订阅消息
public class TopicListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/topics.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "jd.mq.topic";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicListener.DEFAULT_USER, TopicListener.DEFAULT_PASSWORD, TopicListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createTopicConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 6、创建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
//使用监听器的方式订阅消息
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是TopicReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 *100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicListener.run();
}
}