欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java结合WebSphere MQ实现接收队列文件功能

程序员文章站 2024-03-07 15:11:39
首先我们先来简单介绍下websphere mq以及安装使用简介 websphere mq  : 用于传输信息 具有跨平台的功能。 1 安装webspher...

首先我们先来简单介绍下websphere mq以及安装使用简介

websphere mq  : 用于传输信息 具有跨平台的功能。

1 安装websphere mq 并启动

2 websphere mq 建立 queue manager (如:mqsi_sample_qm)

3 建立queue 类型选择 local类型 的 (如lq  )

4 建立channels 类型选择server connection (如bridgechannel)

接下来,我们来看实例代码:

mqfilereceiver.java

package com.mq.dpca.file;
 
import java.io.file;
import java.io.fileoutputstream;
 
import com.ibm.mq.mqenvironment;
import com.ibm.mq.mqexception;
import com.ibm.mq.mqgetmessageoptions;
import com.ibm.mq.mqmessage;
import com.ibm.mq.mqqueue;
import com.ibm.mq.mqqueuemanager;
import com.ibm.mq.constants.mqconstants;
import com.mq.dpca.msg.mqconfig;
import com.mq.dpca.util.readcmdline;
import com.mq.dpca.util.renameutil;
 
/**
 * 
 * mq分组接收文件功能
 * 主动轮询
 */
public class mqfilereceiver {
  private mqqueuemanager qmgr; // 连接到队列管理器
 
  private mqqueue inqueue; // 传输队列
 
  private string queuename = ""; // 队列名称
 
  private string host = ""; //
 
  private int port = 1414; // 侦听器的端口号
 
  private string channel = ""; // 通道名称
 
  private string qmgrname = ""; // 队列管理器
 
  private mqmessage inmsg; // 创建消息缓冲
 
  private mqgetmessageoptions gmo; // 设置获取消息选项
 
  private static string filename = null; // 接收队列上的消息并存入文件
 
  private int ccsid = 0;
 
  private static string file_dir = null;
 
  /**
   * 程序的入口
   * 
   * @param args
   */
  public static void main(string args[]) {
    mqfilereceiver mfs = new mqfilereceiver();
    //初始化连接
    mfs.initproperty();
    //接收文件
    mfs.rungoupreceiver();
    //获取shell脚本名
//   string shellname = mqconfig.getvaluebykey(filename);
//   if(shellname!=null&&!"".equals(shellname)){
//     //调用shell
//     readcmdline.callshell(shellname);
//   }else{
//     system.out.println("have no shell name,only receive files.");
//   }
 
  }
 
  public void rungoupreceiver() {
    try {
      init();
      getgroupmessages();
      qmgr.commit();
      system.out.println("\n messages successfully receive ");
    } catch (mqexception mqe) {
      mqe.printstacktrace();
      try {
        system.out.println("\n backing out transaction ");
        qmgr.backout();
        system.exit(2);
      } catch (exception e) {
        e.printstacktrace();
        system.exit(2);
      }
    } catch (exception e) {
      e.printstacktrace();
      system.exit(2);
    }
  }
 
  /**
   * 初始化服务器连接信息
   * 
   * @throws exception
   */
  private void init() throws exception {
    /* 为客户机连接设置mqenvironment属性 */
    mqenvironment.hostname = host;
    mqenvironment.channel = channel;
    mqenvironment.port = port;
 
    /* 连接到队列管理器 */
    qmgr = new mqqueuemanager(qmgrname);
 
    /* 设置队列打开选项以输 */
    int opnoptn = mqconstants.mqoo_input_as_q_def
        | mqconstants.mqoo_fail_if_quiescing;
 
    /* 打开队列以输 */
    inqueue = qmgr.accessqueue(queuename, opnoptn, null, null, null);
  }
 
  /**
   * 接受文件的主函数
   * 
   * @throws exception
   */
  public void getgroupmessages() {
    /* 设置获取消息选项 */
    gmo = new mqgetmessageoptions();
    gmo.options = mqconstants.mqgmo_fail_if_quiescing;
    gmo.options = gmo.options + mqconstants.mqgmo_syncpoint;
    /* 等待消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_wait;
    /* 设置等待时间限制 */
    gmo.waitinterval = 5000;
    /* 只获取消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_all_msgs_available;
    /* 以辑顺序获取消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_logical_order;
    gmo.matchoptions = mqconstants.mqmo_match_group_id;
    /* 创建消息缓冲 */
    inmsg = new mqmessage();
    try {
      fileoutputstream fos = null;
      /* 处理组消息 */
      while (true) {
        try {
          inqueue.get(inmsg, gmo);
          if (fos == null) {
            try {
              filename = inmsg.getstringproperty("filename");
              string filename_full = null;
              filename_full = file_dir + renameutil.rename(filename);
              fos = new fileoutputstream(new file(filename_full));
              int msglength = inmsg.getmessagelength();
              byte[] buffer = new byte[msglength];
              inmsg.readfully(buffer);
              fos.write(buffer, 0, msglength);
              /* 查看是否是最后消息标识 */
              char x = gmo.groupstatus;
              if (x == mqconstants.mqgs_last_msg_in_group) {
                system.out.println("last msg in group");
                break;
              }
              inmsg.clearmessage();
 
            } catch (exception e) {
              system.out
                  .println("receiver the message without property,do nothing!");
              inmsg.clearmessage();
            }
          } else {
            int msglength = inmsg.getmessagelength();
            byte[] buffer = new byte[msglength];
            inmsg.readfully(buffer);
            fos.write(buffer, 0, msglength);
            /* 查看是否是最后消息标识 */
            char x = gmo.groupstatus;
            if (x == mqconstants.mqgs_last_msg_in_group) {
              system.out.println("last msg in group");
              break;
            }
            inmsg.clearmessage();
          }
        } catch (exception e) {
          char x = gmo.groupstatus;
          if (x == mqconstants.mqgs_last_msg_in_group) {
            system.out.println("last msg in group");
          }
          break;
        }
      }
      if (fos != null)
        fos.close();
    } catch (exception e) {
      system.out.println(e.getmessage());
    }
  }
 
  public void initproperty() {
    mqconfig config = new mqconfig().getinstance();
    if (config.getmq_manager() != null) {
      qmgrname = config.getmq_manager();
      queuename = config.getmq_queue_name();
      channel = config.getmq_channel();
      host = config.getmq_host_name();
      port = integer.valueof(config.getmq_prot());
      ccsid = integer.valueof(config.getmq_ccsid());
      file_dir = config.getfile_dir();
    }
  }
}