欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MQ客户端

程序员文章站 2022-03-07 13:28:36
...
public class mqSendSample {
@SuppressWarnings("rawtypes")
private static Hashtable<String, Comparable> env = new Hashtable<String, Comparable>();

// 队列管理器名
private static String queueManagerName;
// 队列管理器引用
private static MQQueueManager queueManager;
// 队列名
private static String queueName;
// 队列引用
private MQQueue queue;

/**
* <应用启动时初始化队列管理器连
* 由于连接队列管理器如同连接数据一样,建立时需要资源较多, 连接时间较长,因此不要每次创建关闭,建议应用程序保持
* 或多个队列管理器连接但应用关闭时注意关闭连接,释放资源!
*
* @throws Exception
*/
@BeforeClass
public static void initEnvironment() throws Exception {
// 服务器地id、名称
env.put(MQConstants.HOST_NAME_PROPERTY, "192.168.102.29");
// 连接通道
env.put(MQConstants.CHANNEL_PROPERTY, "ESB_TEST");
// 服务器MQ服务使用的编1381代表GBK,1208代表UTF(Coded Character Set Identifier:CCSID)
env.put(MQConstants.CCSID_PROPERTY, 1208);
// 端口号
env.put(MQConstants.PORT_PROPERTY, 1414);
// 传输类型
env.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
// 设置目标队列管理器
queueManagerName = "AIRP_QMGR_002";
// 设置目标队列
queueName = "topic_in3";

// 建立队列管理器连接
connectQM();
}

/**
* 程序结束时释放队列管理连接资源
*
* @throws Exception
*/
@AfterClass
public static void destroyEnvironment() throws Exception {
disconnectQM();
}

@Test
public void testSend() throws Exception {
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue = queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 发送消息(这是为同时多条消息发送)
for (int i = 0; i < 10; i++) {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
/*
* 设置自定义消息头
*/
msg.setStringProperty("sys", "md"); // 服务id
msg.characterSet = 1208;
// 消息内容
String message = "<msg>你好<msg>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue.put(msg, pmo);

}
// 提交事务
queueManager.commit();

} catch (MQException e) {
// 事务回滚
queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue != null) {
queue.close();
}
}

}

private static void connectQM() throws Exception {
queueManager = new MQQueueManager(queueManagerName, env);
}

private static void disconnectQM() throws Exception {
if (queueManager != null) {
queueManager.disconnect();
}
}

public static void main(String[] args) throws Exception {
mqSendSample queue1 = new mqSendSample();
queue1.initEnvironment();
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue1.queue = queue1.queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
// 消息内容编码(1208:utf-8)
msg.characterSet = 1208;
msg.setStringProperty("sys", "md");
String message = "<root><RequestHead><RequestID>123</RequestID><SourceSystem>VMSCODE</SourceSystem>" +
"<TargetSystem>DCECODE</TargetSystem><ServiceName>S0802001A</ServiceName>" +
"<ServiceOperation>TO_ZF</ServiceOperation>" +
"<ServiceVersion></ServiceVersion></RequestHead><RequestBody><updateProductReq><product><action>2</action><prod_id>1010002956</prod_id><prod_code>1010002956</prod_code><prod_simple_code></prod_simple_code><prod_name>测试产品00013</prod_name><unit_id>118</unit_id><prod_memo2k></prod_memo2k><prod_list_price>200.0000</prod_list_price></product></updateProductReq></RequestBody></root>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue1.queue.put(msg, pmo);

// 提交事务
queue1.queueManager.commit();

} catch (MQException e) {
// 事务回滚
queue1.queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue1.queue != null) {
queue1.queue.close();
}
}
}
}
相关标签: MQ