SpringBoot 对IBM MQ进行数据监听接收以及数据发送
程序员文章站
2022-05-29 09:54:08
一、需求介绍 后端使用Spring Boot2.0框架,要实现IBM MQ的实时数据JMS监听接收处理,并形成回执通过MQ队列发送。 二、引入依赖jar包 org.springframework spring-j ......
一、需求介绍
后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。
二、引入依赖jar包
<dependency> <groupid>org.springframework</groupid> <artifactid>spring-jms</artifactid> <version>4.3.18.release</version> </dependency> <dependency> <groupid>javax.jms</groupid> <artifactid>javax.jms-api</artifactid> </dependency> <dependency> <groupid>com.ibm.mq</groupid> <artifactid>com.ibm.mq.allclient</artifactid> <version>9.1.0.0</version> </dependency>
三、监听实现
代码中分为三大块:
1、mq通道连接,我这边是用的用户名密码连接,如果非密码的可不入参
2、mq的队列连接并实现监听
3、mq发送
@configuration public class mqtestconfig { @autowired private mqproperties mqproperties; /**=======================mq 通道工厂============================**/ @bean(name="mqqueueconnectionfactory") public mqqueueconnectionfactory mqqueueconnectionfactory(){ mqqueueconnectionfactory mqqueueconnectionfactory = new mqqueueconnectionfactory(); mqqueueconnectionfactory.sethostname(mqproperties.gethostname()); try { mqqueueconnectionfactory.settransporttype(wmqconstants.wmq_cm_client); mqqueueconnectionfactory.setccsid(mqproperties.getccsid()); mqqueueconnectionfactory.setchannel(mqproperties.getchannel()); mqqueueconnectionfactory.setport(mqproperties.getport()); mqqueueconnectionfactory.setqueuemanager(mqproperties.getqueuemanager()); } catch (jmsexception e) { e.printstacktrace(); } return mqqueueconnectionfactory; } @bean(name="usercredentialsconnectionfactoryadapter") public usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter(mqqueueconnectionfactory mqqueueconnectionfactory){ usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter = new usercredentialsconnectionfactoryadapter(); usercredentialsconnectionfactoryadapter.setusername(mqproperties.getusername()); usercredentialsconnectionfactoryadapter.setpassword(mqproperties.getpassword()); usercredentialsconnectionfactoryadapter.settargetconnectionfactory(mqqueueconnectionfactory); return usercredentialsconnectionfactoryadapter; } /**============================mq 消息监听接收=============================**/ //队列连接 @bean(name="mqueue") public mqqueue mqueue(){ mqqueue mqqueue = new mqqueue(); try { mqqueue.setbasequeuename(mqproperties.getbasequeuenamerecv()); mqqueue.setbasequeuemanagername(mqproperties.getbasequeuemanagername()); } catch (jmsexception e) { e.printstacktrace(); } return mqqueue; } //对队列进行监听 @bean(name="simplemessagelistenercontainer") public simplemessagelistenercontainer simplemessagelistenercontainer(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter,mqqueue mqueue){ simplemessagelistenercontainer simplemessagelistenercontainer = new simplemessagelistenercontainer(); simplemessagelistenercontainer.setconnectionfactory(usercredentialsconnectionfactoryadapter); simplemessagelistenercontainer.setdestination(mqueue); simplemessagelistenercontainer.setmessagelistener(decmqriskrecvservice()); return simplemessagelistenercontainer; } //报文处理类 @bean(name="decmqriskrecvservice") public decmqriskrecvservice decmqriskrecvservice(){ return new decmqriskrecvservice(); } /**============================mq 发送消息============================**/ @bean(name="cachingconnectionfactory") public cachingconnectionfactory cachingconnectionfactory(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter){ cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory(); cachingconnectionfactory.settargetconnectionfactory(usercredentialsconnectionfactoryadapter); cachingconnectionfactory.setsessioncachesize(5); cachingconnectionfactory.setreconnectonexception(true); return cachingconnectionfactory; } @bean(name="jmstransactionmanager") public platformtransactionmanager jmstransactionmanager(cachingconnectionfactory cachingconnectionfactory){ jmstransactionmanager jmstransactionmanager = new jmstransactionmanager(); jmstransactionmanager.setconnectionfactory(cachingconnectionfactory); return jmstransactionmanager; } @bean(name="jmsoperations") public jmsoperations jmsoperations(cachingconnectionfactory cachingconnectionfactory){ jmstemplate jmstemplate = new jmstemplate(cachingconnectionfactory); jmstemplate.setreceivetimeout(mqproperties.getreceivetimeout()); return jmstemplate; } }
mq配置文件
记得要添加get和set方法
@configuration @configurationproperties(prefix=mqproperties.mq_prefix) public class mqproperties { public static final string mq_prefix = "mq"; private string hostname; private int port; private string channel; private int ccsid; private string username; private string password; private string queuemanager; private string basequeuemanagername; private string basequeuenamerecv; private string basequeuenamesend; private long receivetimeout; }
报文处理类及回执发送
1、实现类要实现messagelistener,重写onmessage方法,message就是监听到的消息。
2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。
3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了
"queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1"
这样发送的报文会去掉报文头信息。
@service public class decmqriskrecvservice implements messagelistener { @autowired private jmsoperations jmsoperations; @autowired private mqproperties mqproperties; @override public void onmessage(message message) { string str = null; // 1、读取报文 try { if (message instanceof bytesmessage) { bytesmessage bm = (bytesmessage) message; byte[] bys = null; bys = new byte[(int) bm.getbodylength()]; bm.readbytes(bys); str = new string(bys, "utf-8"); } else { str = ((textmessage) message).gettext(); str = new string(str.getbytes("iso-8859-1"), "utf-8"); } } catch (jmsexception e) { e.printstacktrace(); } catch (unsupportedencodingexception e) { e.printstacktrace(); } // 2、处理报文 // 3、组装回执发送 string receipt = ""; try { jmsoperations.convertandsend("queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1", receipt.getbytes("utf-8")); } catch (jmsexception e) { e.printstacktrace(); } catch (unsupportedencodingexception e) { e.printstacktrace(); } } }