欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JMS ActiveMQ Queue(点对点模型)实例(二)

程序员文章站 2022-04-27 18:01:35
...

转载请出自出处:http://eksliang.iteye.com/blog/2242495

一.使用消息队列模型发送消息至activeMQ(生产者开发)

 

package com.gosun.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 消息队列模型发送消息至activeMQ
 * @author Ickes
 */
public class QueueSend {
	public static void main(String[] args) throws Exception {
		//第一步:根据url创建一个jms Connection。 
		ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionfactory.createConnection(); 
		connection.start();  
		//第二步:根据connection获取session
		Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		//第三步:消息的目的地
		Destination destination = new ActiveMQQueue("gosun"); 
		//第四步:创建消息生产者
		MessageProducer producer = session.createProducer(destination);  
		//设置不持久化
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//第五步:创建消息
		Message msg = session.createTextMessage("JMS 告诉你我是ICKES");
		//第六步:生产者向JMS发送消息到队列
		producer.send(msg);
        //第七步:关闭连接
		session.close();  
        connection.close(); 
	}
}

 第二步:Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);解释如下

 

 

在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。 

  • AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。  
  • CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。  
  • DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。 
  • SESSION_TRANSACTED

 

二、消息消费者,手动接收示例(消费者)

package com.gosun.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者,手动接收示例
 * @author Ickes
 */
public class QueuesAccept {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection conn = connectionFactory.createConnection();  
		Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
		conn.start();
		//消息目的地
		Destination dest = session.createQueue("gosun");
		//消息消费者
		MessageConsumer consumer = session.createConsumer(dest);
		//接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听 
		TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
		String text = textMessage.getText();  
	    System.out.println("接收到的消息为:" + text);  
	    //关闭通道
	    consumer.close();  
	    session.close();  
	    conn.close();  
	}
}

 

三、消费者监听器模式处理消息

package com.gosun.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 使用监听器,自动接收消息
 * @author Lenovo
 *
 */
public class QueuesAcceptListener implements MessageListener{
	
	@Override
	public void onMessage(Message message) {
		 TextMessage text = (TextMessage) message;
		 try {
			System.out.println(text.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 测试代码
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection conn = connectionFactory.createConnection();  
		Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
		conn.start();
		//消息目的地
		Destination dest = session.createQueue("gosun");
		//消息消费者
		MessageConsumer consumer = session.createConsumer(dest);
		consumer.setMessageListener(new QueuesAcceptListener());
		//这里不能关闭连接,一旦关闭监听器也就关闭,那就接收不到消息了
	}

}

 

四、生产者发送消息到主题中

package com.gosun.activemq.topic;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

/**
 * 生产者发送消息到主题中
 * @author Ickes
 *
 */
public class TopicSend {
	public static void main(String[] args) throws Exception {
		// 第一步:根据url创建一个jms Connection。
		ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionfactory.createConnection();
		connection.start();
		// 第二步:根据connection获取session
		Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		// 第三步:创建一个Topic
		Topic topic= new ActiveMQTopic("testTopic");  
		// 第四步:创建生产者用于将消息发送至主题
		MessageProducer producer = session.createProducer(topic); 
		// 设置不持久化
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		// 第五步:创建消息
		Message msg = session.createTextMessage("JMS 告诉你我是ICKES");
		producer.send(msg);
		//第七步:关闭连接
		session.close();
		connection.close();
	}
}

 

五、消费者从主题中订阅消息

package com.gosun.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消费者从主题中订阅消息
 * @author Ickes
 *
 */
public class TopicAccept {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection conn = connectionFactory.createConnection();  
		Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
		conn.start();
		//主题目的地
		Topic topic =session.createTopic("testTopic");
		//注册订阅者
		MessageConsumer consumer = session.createConsumer(topic);
		//手动获取消息
		TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
		String text = textMessage.getText();  
	    System.out.println("接收到的消息为:" + text);  
	    //关闭通道
	    consumer.close();  
	    session.close();  
	    conn.close();  
		
		
	}
}