java结合WebSphere MQ实现接收队列文件功能
程序员文章站
2024-03-07 15:11:39
首先我们先来简单介绍下websphere mq以及安装使用简介
websphere mq : 用于传输信息 具有跨平台的功能。
1 安装webspher...
首先我们先来简单介绍下websphere mq以及安装使用简介
websphere mq : 用于传输信息 具有跨平台的功能。
1 安装websphere mq 并启动
2 websphere mq 建立 queue manager (如:mqsi_sample_qm)
3 建立queue 类型选择 local类型 的 (如lq )
4 建立channels 类型选择server connection (如bridgechannel)
接下来,我们来看实例代码:
mqfilereceiver.java package com.mq.dpca.file; import java.io.file; import java.io.fileoutputstream; import com.ibm.mq.mqenvironment; import com.ibm.mq.mqexception; import com.ibm.mq.mqgetmessageoptions; import com.ibm.mq.mqmessage; import com.ibm.mq.mqqueue; import com.ibm.mq.mqqueuemanager; import com.ibm.mq.constants.mqconstants; import com.mq.dpca.msg.mqconfig; import com.mq.dpca.util.readcmdline; import com.mq.dpca.util.renameutil; /** * * mq分组接收文件功能 * 主动轮询 */ public class mqfilereceiver { private mqqueuemanager qmgr; // 连接到队列管理器 private mqqueue inqueue; // 传输队列 private string queuename = ""; // 队列名称 private string host = ""; // private int port = 1414; // 侦听器的端口号 private string channel = ""; // 通道名称 private string qmgrname = ""; // 队列管理器 private mqmessage inmsg; // 创建消息缓冲 private mqgetmessageoptions gmo; // 设置获取消息选项 private static string filename = null; // 接收队列上的消息并存入文件 private int ccsid = 0; private static string file_dir = null; /** * 程序的入口 * * @param args */ public static void main(string args[]) { mqfilereceiver mfs = new mqfilereceiver(); //初始化连接 mfs.initproperty(); //接收文件 mfs.rungoupreceiver(); //获取shell脚本名 // string shellname = mqconfig.getvaluebykey(filename); // if(shellname!=null&&!"".equals(shellname)){ // //调用shell // readcmdline.callshell(shellname); // }else{ // system.out.println("have no shell name,only receive files."); // } } public void rungoupreceiver() { try { init(); getgroupmessages(); qmgr.commit(); system.out.println("\n messages successfully receive "); } catch (mqexception mqe) { mqe.printstacktrace(); try { system.out.println("\n backing out transaction "); qmgr.backout(); system.exit(2); } catch (exception e) { e.printstacktrace(); system.exit(2); } } catch (exception e) { e.printstacktrace(); system.exit(2); } } /** * 初始化服务器连接信息 * * @throws exception */ private void init() throws exception { /* 为客户机连接设置mqenvironment属性 */ mqenvironment.hostname = host; mqenvironment.channel = channel; mqenvironment.port = port; /* 连接到队列管理器 */ qmgr = new mqqueuemanager(qmgrname); /* 设置队列打开选项以输 */ int opnoptn = mqconstants.mqoo_input_as_q_def | mqconstants.mqoo_fail_if_quiescing; /* 打开队列以输 */ inqueue = qmgr.accessqueue(queuename, opnoptn, null, null, null); } /** * 接受文件的主函数 * * @throws exception */ public void getgroupmessages() { /* 设置获取消息选项 */ gmo = new mqgetmessageoptions(); gmo.options = mqconstants.mqgmo_fail_if_quiescing; gmo.options = gmo.options + mqconstants.mqgmo_syncpoint; /* 等待消息 */ gmo.options = gmo.options + mqconstants.mqgmo_wait; /* 设置等待时间限制 */ gmo.waitinterval = 5000; /* 只获取消息 */ gmo.options = gmo.options + mqconstants.mqgmo_all_msgs_available; /* 以辑顺序获取消息 */ gmo.options = gmo.options + mqconstants.mqgmo_logical_order; gmo.matchoptions = mqconstants.mqmo_match_group_id; /* 创建消息缓冲 */ inmsg = new mqmessage(); try { fileoutputstream fos = null; /* 处理组消息 */ while (true) { try { inqueue.get(inmsg, gmo); if (fos == null) { try { filename = inmsg.getstringproperty("filename"); string filename_full = null; filename_full = file_dir + renameutil.rename(filename); fos = new fileoutputstream(new file(filename_full)); int msglength = inmsg.getmessagelength(); byte[] buffer = new byte[msglength]; inmsg.readfully(buffer); fos.write(buffer, 0, msglength); /* 查看是否是最后消息标识 */ char x = gmo.groupstatus; if (x == mqconstants.mqgs_last_msg_in_group) { system.out.println("last msg in group"); break; } inmsg.clearmessage(); } catch (exception e) { system.out .println("receiver the message without property,do nothing!"); inmsg.clearmessage(); } } else { int msglength = inmsg.getmessagelength(); byte[] buffer = new byte[msglength]; inmsg.readfully(buffer); fos.write(buffer, 0, msglength); /* 查看是否是最后消息标识 */ char x = gmo.groupstatus; if (x == mqconstants.mqgs_last_msg_in_group) { system.out.println("last msg in group"); break; } inmsg.clearmessage(); } } catch (exception e) { char x = gmo.groupstatus; if (x == mqconstants.mqgs_last_msg_in_group) { system.out.println("last msg in group"); } break; } } if (fos != null) fos.close(); } catch (exception e) { system.out.println(e.getmessage()); } } public void initproperty() { mqconfig config = new mqconfig().getinstance(); if (config.getmq_manager() != null) { qmgrname = config.getmq_manager(); queuename = config.getmq_queue_name(); channel = config.getmq_channel(); host = config.getmq_host_name(); port = integer.valueof(config.getmq_prot()); ccsid = integer.valueof(config.getmq_ccsid()); file_dir = config.getfile_dir(); } } }
下一篇: 浅谈Java中的Filter过滤器