消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式
之前有篇文件介绍了生产消费者模式(http://blog.csdn.net/canot/article/details/51541920),当时是通过BlockingQueue阻塞队列来实现,以及在Redis中使用pub/sub模式(http://blog.csdn.net/canot/article/details/51938955)。而实际项目中往往是通过JMS使用消息队列来实现这两种模式的。
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS类似与JDBC,sun提供接口,由各个厂商(provider)来进行具体的实现。市面上众多成熟的JMS规范实现的框架Kafk,RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ等。
JMS的队列消息(Queue)传递过程如下图:
对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
JMS的主题消息传递过程如下图:
对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。
我们从ActiveMQ来实践:(安装部署省掉)
Queue模式实践:
消息生产者:
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
//61616是ActiveMQ默认端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息发送者
MessageProducer producer = session.createProducer(destination);
// 设置不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//创建文本消息
TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);
Thread.sleep(1000);
//发送消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
消息接收者
// ConnectionFactory :连接工厂,JMS 用它创建连接
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
public static void main(String[] args) throws JMSException {
// Connection :JMS 客户端到JMS Provider 的连接
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息送谁那获取.
Destination destination = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage)msg ;
System.out.println("consumerOne收到消息: "+message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
运行之后控制台不会退出一直监听消息库,对于消息发送者的十条信息,控制输出:
consumerOne收到消息: hello.I am producer, this is a test message0
consumerOne收到消息: hello.I am producer, this is a test message1
consumerOne收到消息: hello.I am producer, this is a test message2
consumerOne收到消息: hello.I am producer, this is a test message3
consumerOne收到消息: hello.I am producer, this is a test message4
consumerOne收到消息: hello.I am producer, this is a test message5
consumerOne收到消息: hello.I am producer, this is a test message6
consumerOne收到消息: hello.I am producer, this is a test message7
consumerOne收到消息: hello.I am producer, this is a test message8
consumerOne收到消息: hello.I am producer, this is a test message9
如果此时另外一个线程也存在消费者监听该Queue,则两者交换输出,共输出10条
Topic模式实现
消息发布者
public static void main(String[] args) throws JMSException, InterruptedException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
//61616是ActiveMQ默认端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
//Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic("STOCKS.myTopic"); //创建topic myTopic
// MessageProducer:消息发送者
MessageProducer producer = session.createProducer(destination);
// 设置不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<10;i++){
//创建文本消息
TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);
//发送消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
消息订阅者
private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
public void run() {
// Connection :JMS 客户端到JMS Provider 的连接
try {
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息送谁那获取.
// Destination destination = session.createQueue("my-queue");
Destination destination = session.createTopic("STOCKS.myTopic"); // 创建topic
// myTopic
// 消费者,消息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println("consumerOne收到消息: " + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 再来一个消费者,消息接收者
MessageConsumer consumer2 = session.createConsumer(destination);
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage) msg;
System.out.println("consumerTwo收到消息: " + message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
最后消息会重复输出:
consumerOne收到消息: hello.I am producer, this is a test message0
consumerTwo收到消息: hello.I am producer, this is a test message0
consumerOne收到消息: hello.I am producer, this is a test message1
consumerTwo收到消息: hello.I am producer, this is a test message1
consumerOne收到消息: hello.I am producer, this is a test message2
consumerTwo收到消息: hello.I am producer, this is a test message2
consumerOne收到消息: hello.I am producer, this is a test message3
consumerTwo收到消息: hello.I am producer, this is a test message3
consumerOne收到消息: hello.I am producer, this is a test message4
consumerTwo收到消息: hello.I am producer, this is a test message4
consumerOne收到消息: hello.I am producer, this is a test message5
consumerTwo收到消息: hello.I am producer, this is a test message5
consumerOne收到消息: hello.I am producer, this is a test message6
consumerTwo收到消息: hello.I am producer, this is a test message6
consumerOne收到消息: hello.I am producer, this is a test message7
consumerTwo收到消息: hello.I am producer, this is a test message7
consumerOne收到消息: hello.I am producer, this is a test message8
consumerTwo收到消息: hello.I am producer, this is a test message8
consumerOne收到消息: hello.I am producer, this is a test message9
我们简单总结一下使用MQ的过程:
- 1.创建与MQ的链接
- 2.创建消息的目的地或者来原地即Destination
- 3.发送消息或者制定对应的MessageListener
上述就是关于MQ两种消息模型的简单应用,至于具体的细节。如在消费者监听消息时有哪些Listener类型,生产者发送消息时有哪些Message类型。生成Session时参数1表示是否开启事务,至于事务的处理,消息的持久化等等。后面慢慢介绍。
上一篇: thinkPHP中Html模板标签怎么用
下一篇: 关于PHP传值与传引用的奇怪有关问题