WebSphere MQ简单实例(接收消息)
程序员文章站
2022-07-13 12:17:46
...
MQ接收消息:
package com.main;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
* MQ接收消息
*
* @author 88183239
*/
public class TestReceive
{
/**
* jms模板,封装链接工厂、队列、消息生产者
*/
private JmsTemplate jmsTemplate;
public TestReceive()
{
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
jmsTemplate = (JmsTemplate)ctx.getBean("receiveTemplate");
}
/**
* 接收消息
*
* @param msg消息
*/
public void showResult()
{
Message msg = jmsTemplate.receive();
onMessage(msg);
msg = jmsTemplate.receive();
onMessage(msg);
}
@SuppressWarnings("unchecked")
private void onMessage(Message msg)
{
// text消息
if (msg instanceof TextMessage)
{
TextMessage message = (TextMessage)msg;
try
{
String data = message.getText();
System.out.println(data);
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// 对象消息
else if (msg instanceof ObjectMessage)
{
ObjectMessage message = (ObjectMessage)msg;
try
{
int id = message.getIntProperty("id");
System.out.println(id);
boolean flag = message.getBooleanProperty("flag");
System.out.println(flag);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
// map消息
else if (msg instanceof MapMessage)
{
MapMessage message = (MapMessage)msg;
try
{
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
{
String data = (String)mapNames.nextElement();
System.out.println(message.getString(data));
}
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// bytes消息
else if (msg instanceof BytesMessage)
{
BytesMessage message = (BytesMessage)msg;
byte[] buff = null;
String data = null;
try
{
long length = message.getBodyLength();
buff = new byte[(int)length];
message.readBytes(buff);
data = new String(buff, "UTF-8");
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
}
// stream消息
else if (msg instanceof StreamMessage)
{
StreamMessage message = (StreamMessage)msg;
try
{
String data = message.readString();
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args)
{
TestReceive send = new TestReceive();
send.showResult();
}
}
MQ监听消息:
package com.mq;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
/**
* 消息监听
*
* @author
*/
public class ProductView implements MessageListener
{
@SuppressWarnings("unchecked")
public void onMessage(Message msg)
{
// text消息
if (msg instanceof TextMessage)
{
TextMessage message = (TextMessage)msg;
try
{
String data = message.getText();
System.out.println(data);
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// 对象消息
else if (msg instanceof ObjectMessage)
{
ObjectMessage message = (ObjectMessage)msg;
try
{
int id = message.getIntProperty("id");
System.out.println(id);
boolean flag = message.getBooleanProperty("flag");
System.out.println(flag);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
// map消息
else if (msg instanceof MapMessage)
{
MapMessage message = (MapMessage)msg;
try
{
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
{
String data = (String)mapNames.nextElement();
System.out.println(message.getString(data));
}
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// bytes消息
else if (msg instanceof BytesMessage)
{
BytesMessage message = (BytesMessage)msg;
byte[] buff = null;
String data = null;
try
{
long length = message.getBodyLength();
buff = new byte[(int)length];
message.readBytes(buff);
data = new String(buff, "UTF-8");
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
}
// stream消息
else if (msg instanceof StreamMessage)
{
StreamMessage message = (StreamMessage)msg;
try
{
String data = message.readString();
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
}
配置信息:
<bean id="jmsConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="10.21.139.43" />
<property name="port" value="1414" />
<property name="CCSID" value="1381" />
<property name="queueManager" value="QM_SN_CNHQ_9379C" />
</bean>
<bean id="queue" class="com.ibm.mq.jms.MQQueue">
<property name="baseQueueName" value="default" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="defaultDestination" ref="queue" />
<property name="pubSubDomain" value="false" />
</bean>
<!-- 此为接收MQ数据用的配置 -->
<bean id="productViewJmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="queue" />
<property name="messageListener">
<bean class="com.mq.ProductView" />
</property>
<property name="concurrentConsumers" value="10" />
</bean>
package com.main;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
* MQ接收消息
*
* @author 88183239
*/
public class TestReceive
{
/**
* jms模板,封装链接工厂、队列、消息生产者
*/
private JmsTemplate jmsTemplate;
public TestReceive()
{
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
jmsTemplate = (JmsTemplate)ctx.getBean("receiveTemplate");
}
/**
* 接收消息
*
* @param msg消息
*/
public void showResult()
{
Message msg = jmsTemplate.receive();
onMessage(msg);
msg = jmsTemplate.receive();
onMessage(msg);
}
@SuppressWarnings("unchecked")
private void onMessage(Message msg)
{
// text消息
if (msg instanceof TextMessage)
{
TextMessage message = (TextMessage)msg;
try
{
String data = message.getText();
System.out.println(data);
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// 对象消息
else if (msg instanceof ObjectMessage)
{
ObjectMessage message = (ObjectMessage)msg;
try
{
int id = message.getIntProperty("id");
System.out.println(id);
boolean flag = message.getBooleanProperty("flag");
System.out.println(flag);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
// map消息
else if (msg instanceof MapMessage)
{
MapMessage message = (MapMessage)msg;
try
{
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
{
String data = (String)mapNames.nextElement();
System.out.println(message.getString(data));
}
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// bytes消息
else if (msg instanceof BytesMessage)
{
BytesMessage message = (BytesMessage)msg;
byte[] buff = null;
String data = null;
try
{
long length = message.getBodyLength();
buff = new byte[(int)length];
message.readBytes(buff);
data = new String(buff, "UTF-8");
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
}
// stream消息
else if (msg instanceof StreamMessage)
{
StreamMessage message = (StreamMessage)msg;
try
{
String data = message.readString();
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args)
{
TestReceive send = new TestReceive();
send.showResult();
}
}
MQ监听消息:
package com.mq;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
/**
* 消息监听
*
* @author
*/
public class ProductView implements MessageListener
{
@SuppressWarnings("unchecked")
public void onMessage(Message msg)
{
// text消息
if (msg instanceof TextMessage)
{
TextMessage message = (TextMessage)msg;
try
{
String data = message.getText();
System.out.println(data);
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// 对象消息
else if (msg instanceof ObjectMessage)
{
ObjectMessage message = (ObjectMessage)msg;
try
{
int id = message.getIntProperty("id");
System.out.println(id);
boolean flag = message.getBooleanProperty("flag");
System.out.println(flag);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
// map消息
else if (msg instanceof MapMessage)
{
MapMessage message = (MapMessage)msg;
try
{
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
{
String data = (String)mapNames.nextElement();
System.out.println(message.getString(data));
}
}
catch (JMSException e)
{
throw new RuntimeException("JMSException", e);
}
}
// bytes消息
else if (msg instanceof BytesMessage)
{
BytesMessage message = (BytesMessage)msg;
byte[] buff = null;
String data = null;
try
{
long length = message.getBodyLength();
buff = new byte[(int)length];
message.readBytes(buff);
data = new String(buff, "UTF-8");
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
}
// stream消息
else if (msg instanceof StreamMessage)
{
StreamMessage message = (StreamMessage)msg;
try
{
String data = message.readString();
System.out.println(data);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
}
配置信息:
<bean id="jmsConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="10.21.139.43" />
<property name="port" value="1414" />
<property name="CCSID" value="1381" />
<property name="queueManager" value="QM_SN_CNHQ_9379C" />
</bean>
<bean id="queue" class="com.ibm.mq.jms.MQQueue">
<property name="baseQueueName" value="default" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="defaultDestination" ref="queue" />
<property name="pubSubDomain" value="false" />
</bean>
<!-- 此为接收MQ数据用的配置 -->
<bean id="productViewJmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="queue" />
<property name="messageListener">
<bean class="com.mq.ProductView" />
</property>
<property name="concurrentConsumers" value="10" />
</bean>
下一篇: IBM MQ消息的传递过程