MQ客户端
程序员文章站
2022-07-13 12:29:18
...
public class mqSendSample {
@SuppressWarnings("rawtypes")
private static Hashtable<String, Comparable> env = new Hashtable<String, Comparable>();
// 队列管理器名
private static String queueManagerName;
// 队列管理器引用
private static MQQueueManager queueManager;
// 队列名
private static String queueName;
// 队列引用
private MQQueue queue;
/**
* <应用启动时初始化队列管理器连
* 由于连接队列管理器如同连接数据一样,建立时需要资源较多, 连接时间较长,因此不要每次创建关闭,建议应用程序保持
* 或多个队列管理器连接但应用关闭时注意关闭连接,释放资源!
*
* @throws Exception
*/
@BeforeClass
public static void initEnvironment() throws Exception {
// 服务器地id、名称
env.put(MQConstants.HOST_NAME_PROPERTY, "192.168.102.29");
// 连接通道
env.put(MQConstants.CHANNEL_PROPERTY, "ESB_TEST");
// 服务器MQ服务使用的编1381代表GBK,1208代表UTF(Coded Character Set Identifier:CCSID)
env.put(MQConstants.CCSID_PROPERTY, 1208);
// 端口号
env.put(MQConstants.PORT_PROPERTY, 1414);
// 传输类型
env.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
// 设置目标队列管理器
queueManagerName = "AIRP_QMGR_002";
// 设置目标队列
queueName = "topic_in3";
// 建立队列管理器连接
connectQM();
}
/**
* 程序结束时释放队列管理连接资源
*
* @throws Exception
*/
@AfterClass
public static void destroyEnvironment() throws Exception {
disconnectQM();
}
@Test
public void testSend() throws Exception {
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue = queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 发送消息(这是为同时多条消息发送)
for (int i = 0; i < 10; i++) {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
/*
* 设置自定义消息头
*/
msg.setStringProperty("sys", "md"); // 服务id
msg.characterSet = 1208;
// 消息内容
String message = "<msg>你好<msg>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue.put(msg, pmo);
}
// 提交事务
queueManager.commit();
} catch (MQException e) {
// 事务回滚
queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue != null) {
queue.close();
}
}
}
private static void connectQM() throws Exception {
queueManager = new MQQueueManager(queueManagerName, env);
}
private static void disconnectQM() throws Exception {
if (queueManager != null) {
queueManager.disconnect();
}
}
public static void main(String[] args) throws Exception {
mqSendSample queue1 = new mqSendSample();
queue1.initEnvironment();
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue1.queue = queue1.queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
// 消息内容编码(1208:utf-8)
msg.characterSet = 1208;
msg.setStringProperty("sys", "md");
String message = "<root><RequestHead><RequestID>123</RequestID><SourceSystem>VMSCODE</SourceSystem>" +
"<TargetSystem>DCECODE</TargetSystem><ServiceName>S0802001A</ServiceName>" +
"<ServiceOperation>TO_ZF</ServiceOperation>" +
"<ServiceVersion></ServiceVersion></RequestHead><RequestBody><updateProductReq><product><action>2</action><prod_id>1010002956</prod_id><prod_code>1010002956</prod_code><prod_simple_code></prod_simple_code><prod_name>测试产品00013</prod_name><unit_id>118</unit_id><prod_memo2k></prod_memo2k><prod_list_price>200.0000</prod_list_price></product></updateProductReq></RequestBody></root>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue1.queue.put(msg, pmo);
// 提交事务
queue1.queueManager.commit();
} catch (MQException e) {
// 事务回滚
queue1.queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue1.queue != null) {
queue1.queue.close();
}
}
}
}
@SuppressWarnings("rawtypes")
private static Hashtable<String, Comparable> env = new Hashtable<String, Comparable>();
// 队列管理器名
private static String queueManagerName;
// 队列管理器引用
private static MQQueueManager queueManager;
// 队列名
private static String queueName;
// 队列引用
private MQQueue queue;
/**
* <应用启动时初始化队列管理器连
* 由于连接队列管理器如同连接数据一样,建立时需要资源较多, 连接时间较长,因此不要每次创建关闭,建议应用程序保持
* 或多个队列管理器连接但应用关闭时注意关闭连接,释放资源!
*
* @throws Exception
*/
@BeforeClass
public static void initEnvironment() throws Exception {
// 服务器地id、名称
env.put(MQConstants.HOST_NAME_PROPERTY, "192.168.102.29");
// 连接通道
env.put(MQConstants.CHANNEL_PROPERTY, "ESB_TEST");
// 服务器MQ服务使用的编1381代表GBK,1208代表UTF(Coded Character Set Identifier:CCSID)
env.put(MQConstants.CCSID_PROPERTY, 1208);
// 端口号
env.put(MQConstants.PORT_PROPERTY, 1414);
// 传输类型
env.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
// 设置目标队列管理器
queueManagerName = "AIRP_QMGR_002";
// 设置目标队列
queueName = "topic_in3";
// 建立队列管理器连接
connectQM();
}
/**
* 程序结束时释放队列管理连接资源
*
* @throws Exception
*/
@AfterClass
public static void destroyEnvironment() throws Exception {
disconnectQM();
}
@Test
public void testSend() throws Exception {
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue = queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 发送消息(这是为同时多条消息发送)
for (int i = 0; i < 10; i++) {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
/*
* 设置自定义消息头
*/
msg.setStringProperty("sys", "md"); // 服务id
msg.characterSet = 1208;
// 消息内容
String message = "<msg>你好<msg>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue.put(msg, pmo);
}
// 提交事务
queueManager.commit();
} catch (MQException e) {
// 事务回滚
queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue != null) {
queue.close();
}
}
}
private static void connectQM() throws Exception {
queueManager = new MQQueueManager(queueManagerName, env);
}
private static void disconnectQM() throws Exception {
if (queueManager != null) {
queueManager.disconnect();
}
}
public static void main(String[] args) throws Exception {
mqSendSample queue1 = new mqSendSample();
queue1.initEnvironment();
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue1.queue = queue1.queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
// 消息内容编码(1208:utf-8)
msg.characterSet = 1208;
msg.setStringProperty("sys", "md");
String message = "<root><RequestHead><RequestID>123</RequestID><SourceSystem>VMSCODE</SourceSystem>" +
"<TargetSystem>DCECODE</TargetSystem><ServiceName>S0802001A</ServiceName>" +
"<ServiceOperation>TO_ZF</ServiceOperation>" +
"<ServiceVersion></ServiceVersion></RequestHead><RequestBody><updateProductReq><product><action>2</action><prod_id>1010002956</prod_id><prod_code>1010002956</prod_code><prod_simple_code></prod_simple_code><prod_name>测试产品00013</prod_name><unit_id>118</unit_id><prod_memo2k></prod_memo2k><prod_list_price>200.0000</prod_list_price></product></updateProductReq></RequestBody></root>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue1.queue.put(msg, pmo);
// 提交事务
queue1.queueManager.commit();
} catch (MQException e) {
// 事务回滚
queue1.queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue1.queue != null) {
queue1.queue.close();
}
}
}
}
推荐阅读
-
[PHP]利用XAMPP搭建本地服务器, 然后利用iOS客户端上传数据到本地服务器中(一.安装XAMPP) - M_Lee
-
php获得客户端浏览器名称及版本的方法(基于ECShop函数),
-
七牛云客户端上传视频
-
php 怎样获取当前客户端登陆用户的用户名解决方案
-
node.js基于dgram数据报模块创建UDP服务器和客户端操作示例
-
java WebSocket客户端断线重连的实现方法
-
Android客户端与PHP服务端通信(五)-移植使用极光推送
-
dragon boat festival php实现post接口 接收android客户端json数据 存储数据库 并返回json
-
使用 acl 库编写高效的 C++ redis 客户端应用
-
支持集群版 redis 的客户端例子