JMS 五种消息体的发送/接收(三)
程序员文章站
2022-05-16 12:23:57
...
转载请出自出处:http://eksliang.iteye.com/blog/2242642
在JMS与ActiveMQ简介(一)中介绍了JMS消息的组成,由三个部分组成,head(头)、properties(属性)、body(包体),其中body中存放要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。
- StreamMessage 一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。
- MapMessage 一种主体中包含一组键--值对的消息。没有定义条目顺序。
- TextMessage 一种主体中包含Java字符串的消息(例如,XML消息)。
- ObjectMessage 一种主体中包含序列化Java对象的消息。
- BytesMessage 一种主体中包含连续字节流的消息。
下面是对这五种消息的示例:
一、序列化对象,用于ObjectMessage的测试,代码如下:
package com.gosun.activemq.advanced; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = 1454L; private String userName; private String userPwd; private Integer sex; private Float sal; get() set().....! @Override public String toString() { return "User [userName=" + userName + ", userPwd=" + userPwd + ", sex=" + sex + ", sal=" + sal + "]"; } }
二、发送消息的生产者
package com.gosun.activemq.advanced; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * 消息队列模型发送消息至activeMQ * @author Ickes */ public class QueueSend { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionfactory.createConnection(); connection.start(); Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = new ActiveMQQueue("gosun"); MessageProducer producer = session.createProducer(destination); //设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送--文本消息 TextMessage msgText = session.createTextMessage(); msgText.setText("我是文本消息"); //发送--Map消息 MapMessage msgMap = session.createMapMessage(); msgMap.setString("name","ickes"); msgMap.setInt("sal",12000); //发送--序列化消息 ObjectMessage msgObj = session.createObjectMessage(); User user = new User(); user.setUserName("ickes"); user.setSex(1); user.setUserPwd("1234"); user.setSal(120000f); msgObj.setObject(user); //发送--消息流 StreamMessage smsg = session.createStreamMessage(); smsg.writeString("我是StreamMessage流消息"); //发送--字节消息 String byStrs = "我是字节消息"; BytesMessage bmsg = session.createBytesMessage(); bmsg.writeBytes(byStrs.getBytes()); //第六步:生产者向JMS发送消息到队列 producer.send(msgText); producer.send(msgMap); producer.send(msgObj); producer.send(smsg); producer.send(bmsg); //第七步:关闭连接 session.close(); connection.close(); } }
三、接收消息的消费者
package com.gosun.activemq.advanced; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者,手动接收示例 * @author Ickes */ public class QueuesAccept { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //消息目的地 Destination dest = session.createQueue("gosun"); //消息消费者 MessageConsumer consumer = session.createConsumer(dest); //接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听 Message msg = consumer.receive(10*1000); if(msg instanceof TextMessage){//接收text消息 System.out.println(((TextMessage) msg).getText()); }else if(msg instanceof MapMessage){//接收map消息 System.out.println(((MapMessage) msg).getString("name")); System.out.println(((MapMessage) msg).getString("sal")); }else if (msg instanceof ObjectMessage) {//接收序列号消息 System.out.println(((ObjectMessage) msg).getObject()); }else if(msg instanceof StreamMessage){ //接收流消息 System.out.println(((StreamMessage)msg).readString()); }else if(msg instanceof BytesMessage){ //接收字节消息 byte[] bs = new byte[1024]; BytesMessage message = (BytesMessage)msg; while(message.readBytes(bs) !=- 1){ System.out.println(new String(bs)); } } //关闭通道 consumer.close(); session.close(); conn.close(); } }
推荐阅读