欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot 对IBM MQ进行数据监听接收以及数据发送

程序员文章站 2023-04-04 16:50:33
一、需求介绍 后端使用Spring Boot2.0框架,要实现IBM MQ的实时数据JMS监听接收处理,并形成回执通过MQ队列发送。 二、引入依赖jar包 org.springframework spring-j ......

一、需求介绍

后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。

二、引入依赖jar包

<dependency>
        <groupid>org.springframework</groupid>
        <artifactid>spring-jms</artifactid>
        <version>4.3.18.release</version>
</dependency>
<dependency>
        <groupid>javax.jms</groupid>
        <artifactid>javax.jms-api</artifactid>
</dependency>
<dependency>
        <groupid>com.ibm.mq</groupid>
        <artifactid>com.ibm.mq.allclient</artifactid>
        <version>9.1.0.0</version>
</dependency>

三、监听实现

代码中分为三大块:

1、mq通道连接,我这边是用的用户名密码连接,如果非密码的可不入参

2、mq的队列连接并实现监听

3、mq发送

@configuration
public class mqtestconfig {
    
    @autowired
    private mqproperties mqproperties;
    
    /**=======================mq 通道工厂============================**/
    @bean(name="mqqueueconnectionfactory")
    public mqqueueconnectionfactory mqqueueconnectionfactory(){
        mqqueueconnectionfactory mqqueueconnectionfactory = new mqqueueconnectionfactory();
        mqqueueconnectionfactory.sethostname(mqproperties.gethostname());
        try {
            mqqueueconnectionfactory.settransporttype(wmqconstants.wmq_cm_client);
            mqqueueconnectionfactory.setccsid(mqproperties.getccsid());
            mqqueueconnectionfactory.setchannel(mqproperties.getchannel());
            mqqueueconnectionfactory.setport(mqproperties.getport());
            mqqueueconnectionfactory.setqueuemanager(mqproperties.getqueuemanager());
        } catch (jmsexception e) {
            e.printstacktrace();
        }
        return mqqueueconnectionfactory;
    }
    @bean(name="usercredentialsconnectionfactoryadapter")
    public usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter(mqqueueconnectionfactory mqqueueconnectionfactory){
        usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter = new usercredentialsconnectionfactoryadapter();
        usercredentialsconnectionfactoryadapter.setusername(mqproperties.getusername());
        usercredentialsconnectionfactoryadapter.setpassword(mqproperties.getpassword());
        usercredentialsconnectionfactoryadapter.settargetconnectionfactory(mqqueueconnectionfactory);
        return usercredentialsconnectionfactoryadapter;
    }
    
    /**============================mq 消息监听接收=============================**/
    //队列连接
    @bean(name="mqueue")
    public mqqueue mqueue(){
        mqqueue mqqueue = new mqqueue();
        try {
            mqqueue.setbasequeuename(mqproperties.getbasequeuenamerecv());
            mqqueue.setbasequeuemanagername(mqproperties.getbasequeuemanagername());
        } catch (jmsexception e) {
            e.printstacktrace();
        }
        return mqqueue;
    }
    //对队列进行监听
    @bean(name="simplemessagelistenercontainer")
    public simplemessagelistenercontainer simplemessagelistenercontainer(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter,mqqueue mqueue){
        simplemessagelistenercontainer simplemessagelistenercontainer = new simplemessagelistenercontainer();
        simplemessagelistenercontainer.setconnectionfactory(usercredentialsconnectionfactoryadapter);
        simplemessagelistenercontainer.setdestination(mqueue);
        simplemessagelistenercontainer.setmessagelistener(decmqriskrecvservice());
        return simplemessagelistenercontainer;
    }
    //报文处理类
    @bean(name="decmqriskrecvservice")
    public decmqriskrecvservice decmqriskrecvservice(){
        return new decmqriskrecvservice();
    }
    
    /**============================mq 发送消息============================**/
    @bean(name="cachingconnectionfactory")
    public cachingconnectionfactory cachingconnectionfactory(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter){
        cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory();
        cachingconnectionfactory.settargetconnectionfactory(usercredentialsconnectionfactoryadapter);
        cachingconnectionfactory.setsessioncachesize(5);
        cachingconnectionfactory.setreconnectonexception(true);
        return cachingconnectionfactory;
    }
    @bean(name="jmstransactionmanager")
    public platformtransactionmanager jmstransactionmanager(cachingconnectionfactory cachingconnectionfactory){
        jmstransactionmanager jmstransactionmanager = new jmstransactionmanager();
        jmstransactionmanager.setconnectionfactory(cachingconnectionfactory);
        return jmstransactionmanager;
    }
    @bean(name="jmsoperations")
    public jmsoperations jmsoperations(cachingconnectionfactory cachingconnectionfactory){
        jmstemplate jmstemplate = new jmstemplate(cachingconnectionfactory);
        jmstemplate.setreceivetimeout(mqproperties.getreceivetimeout());
        return jmstemplate;
    }

}

mq配置文件

记得要添加get和set方法

@configuration
@configurationproperties(prefix=mqproperties.mq_prefix)
public class mqproperties {
    public static final string mq_prefix = "mq";
    private string hostname;
    private int port;
    private string channel;
    private int ccsid;
    private string username;
    private string password;
    private string queuemanager;
    private string basequeuemanagername;
    private string basequeuenamerecv;
    private string basequeuenamesend;
    private long receivetimeout;
    
}

报文处理类及回执发送

1、实现类要实现messagelistener,重写onmessage方法,message就是监听到的消息。

2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。

3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了

  "queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1"

  这样发送的报文会去掉报文头信息。

@service
public class decmqriskrecvservice implements messagelistener {

    @autowired
    private jmsoperations jmsoperations;
    @autowired
    private mqproperties mqproperties;

    @override
    public void onmessage(message message) {
        string str = null;
        // 1、读取报文
        try {
            if (message instanceof bytesmessage) {
                bytesmessage bm = (bytesmessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getbodylength()];
                bm.readbytes(bys);
                str = new string(bys, "utf-8");
            } else {
                str = ((textmessage) message).gettext();
                str = new string(str.getbytes("iso-8859-1"), "utf-8");
            }
        } catch (jmsexception e) {
            e.printstacktrace();
        } catch (unsupportedencodingexception e) {
            e.printstacktrace();
        }

        // 2、处理报文

        // 3、组装回执发送
        string receipt = "";
        try {
            jmsoperations.convertandsend("queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1", receipt.getbytes("utf-8"));
        } catch (jmsexception e) {
            e.printstacktrace();
        } catch (unsupportedencodingexception e) {
            e.printstacktrace();
        }
    }

}