欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JMS 五种消息体的发送/接收(三)

程序员文章站 2022-05-16 12:23:57
...

转载请出自出处:http://eksliang.iteye.com/blog/2242642

JMS与ActiveMQ简介(一)中介绍了JMS消息的组成,由三个部分组成,head(头)、properties(属性)、body(包体),其中body中存放要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。

  1. StreamMessage   一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。
  2. MapMessage       一种主体中包含一组键--值对的消息。没有定义条目顺序。
  3. TextMessage       一种主体中包含Java字符串的消息(例如,XML消息)。
  4. ObjectMessage    一种主体中包含序列化Java对象的消息。
  5. BytesMessage     一种主体中包含连续字节流的消息。

下面是对这五种消息的示例:

一、序列化对象,用于ObjectMessage的测试,代码如下:
package com.gosun.activemq.advanced;

import java.io.Serializable;
public class User implements Serializable{
	private static final long serialVersionUID = 1454L;
	private String  userName;
	private String  userPwd;
	private Integer sex;
	private Float   sal;

	get() set().....!

	@Override
	public String toString() {
		return "User [userName=" + userName + ", userPwd=" + userPwd + ", sex="
				+ sex + ", sal=" + sal + "]";
	}
}

 

二、发送消息的生产者
package com.gosun.activemq.advanced;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 消息队列模型发送消息至activeMQ
 * @author Ickes
 */
public class QueueSend {
	public static void main(String[] args) throws Exception {
		ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionfactory.createConnection(); 
		connection.start();  
		Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		Destination destination = new ActiveMQQueue("gosun"); 
		MessageProducer producer = session.createProducer(destination);  
		//设置不持久化
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
		//发送--文本消息
		TextMessage msgText = session.createTextMessage();
		msgText.setText("我是文本消息");
		
		//发送--Map消息
		MapMessage msgMap = session.createMapMessage();
		msgMap.setString("name","ickes");
		msgMap.setInt("sal",12000);
		
		//发送--序列化消息
		ObjectMessage msgObj = session.createObjectMessage();
		User user = new User();
		user.setUserName("ickes");
		user.setSex(1);
		user.setUserPwd("1234");
		user.setSal(120000f);
		msgObj.setObject(user);
		
		//发送--消息流
		StreamMessage smsg = session.createStreamMessage();
		smsg.writeString("我是StreamMessage流消息");
		
		//发送--字节消息  
		String byStrs = "我是字节消息";
		BytesMessage bmsg = session.createBytesMessage();
		bmsg.writeBytes(byStrs.getBytes());
		
		//第六步:生产者向JMS发送消息到队列
		producer.send(msgText);
		producer.send(msgMap);
		producer.send(msgObj);
		producer.send(smsg);
		producer.send(bmsg);
        //第七步:关闭连接
		session.close();  
        connection.close(); 
	}
}

 

三、接收消息的消费者
package com.gosun.activemq.advanced;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者,手动接收示例
 * @author Ickes
 */
public class QueuesAccept {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection conn = connectionFactory.createConnection();  
		Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
		conn.start();
		//消息目的地
		Destination dest = session.createQueue("gosun");
		//消息消费者
		MessageConsumer consumer = session.createConsumer(dest);
		//接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听 
		Message msg = consumer.receive(10*1000);
		if(msg instanceof TextMessage){//接收text消息
			System.out.println(((TextMessage) msg).getText());
		}else if(msg instanceof MapMessage){//接收map消息
			System.out.println(((MapMessage) msg).getString("name"));
			System.out.println(((MapMessage) msg).getString("sal"));
		}else if (msg instanceof ObjectMessage) {//接收序列号消息
			System.out.println(((ObjectMessage) msg).getObject());
		}else if(msg instanceof StreamMessage){  //接收流消息
			System.out.println(((StreamMessage)msg).readString());
		}else if(msg instanceof BytesMessage){   //接收字节消息
			byte[] bs = new byte[1024];   
            BytesMessage message = (BytesMessage)msg;   
            while(message.readBytes(bs) !=- 1){   
                System.out.println(new String(bs));   
            }  
		}
	    //关闭通道
	    consumer.close();  
	    session.close();  
	    conn.close();  
	}
}