activeMq集成 博客分类: activeMq
activeMq集成:
mq类似redis安装好了软件之后,就在java端集成,用工厂工具类调用,mq用于消息推送(结合页面的轮询做聊天工具),结合redis缓解高并发压力---来不及处理的放在队列中,
多系统的事务回滚通知
pom.xml
<!-- activemq相关依赖 -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<!-- <dependency> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId>
<version>1.2.1</version> </dependency> <dependency> <groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId> <version>1.2.1</version> </dependency> -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.16.RELEASE</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<classifier>jdk15</classifier>
<version>2.4</version>
</dependency>
<!-- activemq相关依赖 Over -->
application.xml
<!-- 引入activemq -->
<import resource="activemq.xml" />
activemq.xml;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
<bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
<!-- UDP传输方式 -->
<property name="brokerURL" value="tcp://10.0.1.126:61616" />
<!-- TCP传输方式 -->
<property name="useAsyncSend" value="true" />
</bean>
<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
<!-- UDP传输方式需要在activemq上面做配置 -->
<property name="brokerURL" value="tcp://10.0.1.126:61616" />
<!-- TCP传输方式 -->
</bean>
<!-- 定义主题 -->
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="esteelMessage-mq" />
</bean>
<bean id="messageConvertForSys" class="com.esteel.message.mq.MessageConvertForSys" />
<!-- TOPIC send jms模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="topicSendConnectionFactory" />
<property name="defaultDestination" ref="myTopic" />
<property name="messageConverter" ref="messageConvertForSys" />
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
<property name="deliveryMode" value="1" />
<property name="pubSubDomain" value="true" />
<!-- 开启订阅模式 -->
</bean>
<!-- 消息发送方 -->
<bean id="topicSender" class="com.esteel.message.mq.MessageSender">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<!-- <bean id="springContextUtil" class="com.esteel.common.SpringContextUtil" /> -->
<!-- 消息接收方 -->
<bean id="topicReceiver" class="com.esteel.message.mq.MessageReceiver" />
<!-- 主题消息监听容器,一经注册,自动监听 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="topicListenConnectionFactory" />
<property name="pubSubDomain" value="true" />
<!-- true 订阅模式 -->
<property name="destination" ref="myTopic" />
<!-- 目的地 myTopic -->
<property name="subscriptionDurable" value="true" />
<!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉 -->
<property name="clientId" value="clientId_esteelMessage_1" />
<property name="messageListener" ref="topicReceiver" />
</bean>
<!-- Servlet -->
<!-- <bean id="ControlServlet1" class="com.esteel.servlet.ControlServlet1">
<property name="topicSender" ref="topicSender" /> </bean> -->
</beans>
自定义发送,接收,转化类:
转化:
package com.esteel.message.mq;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
public class MessageSender {
private JmsTemplate jmsTemplate;
public void sendMessage(String msg) {
jmsTemplate.convertAndSend(msg);
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
接收:
package com.esteel.message.mq;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import com.esteel.exception.EsteelException;
import com.esteel.message.bean.TbConObj;
import com.esteel.message.bean.TbConOrd;
import com.esteel.message.bean.TbConOrdPrice;
import com.esteel.message.beanVo.TbConOrdVo;
import com.esteel.message.redis.service.RedisService;
import com.esteel.message.service.TbConObjService;
import com.esteel.message.service.TbConOrdPriceService;
import com.esteel.message.service.TbConOrdService;
import net.sf.json.JSONObject;
@Controller
public class MessageReceiver implements MessageListener {
@Autowired
TbConOrdService tbConOrdService;
@Autowired
TbConOrdPriceService tbConOrdPriceService;
@Autowired
TbConObjService tbConObjService;
@Autowired
RedisService redisService;
public void onMessage(Message m) {
ObjectMessage om = (ObjectMessage) m;
try {
String key_esteelMessage = om.getStringProperty("key_esteelMessage");
JSONObject object1 = JSONObject.fromObject(key_esteelMessage);
String objectName = (String)object1.get("objectName");
if(objectName.equals("TbConOrdVo")){
JSONObject object2 = (JSONObject) object1.get("object");
TbConOrdVo tbConOrdVo=(TbConOrdVo)JSONObject.toBean(object2, TbConOrdVo.class);
TbConOrd tbConOrd = new TbConOrd();
/*获取ipAddress*/
String ipAddress = (String)object1.get("ipAddress");
/* 从tbConOrdVo中提取tbConOrd */
tbConOrd = copyTbConOrd(tbConOrdVo, tbConOrd, ipAddress);
/* 写入tbConOrd */
tbConOrd = tbConOrdService.insertTbConOrd(tbConOrd);
TbConOrdPrice tbConOrdPrice = new TbConOrdPrice();
tbConOrdPrice = copyTbConOrdPrice(tbConOrd, tbConOrdVo, tbConOrdPrice);
/* 写入聊天文字到 tbConOrdPrice*/
String msgText = tbConOrdPrice.getMsgText();
if (msgText.equals("请录入您的议价留言,最大为300个字符!按Ctrl+Enter提交!")) {
tbConOrdPrice.setMsgText("");
}
//tbConOrd = tbConOrdService.getTbConOrdByObj(tbConOrd);
//tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());
tbConOrdPrice=tbConOrdPriceService.insertTbConOrdPrice(tbConOrdPrice);
/* 还得写入tbConObj的数据 */
TbConObj tbConObj=tbConObjService.getTbConObjById(tbConOrd.getConobjKey());
if(tbConObj.getContradeType().equals("A")){
/*销售单*/
/*设置最低价位*/
if(tbConObj.getHighPrice()==null){
tbConObj.setHighPrice(tbConObj.getOrderPrice());
}
if(tbConOrdPrice.getOrdpriceMan()==null){
/*当前对象是客人*/
tbConObj.setLowPrice(tbConOrd.getOrderPrice());
}else if(tbConOrdPrice.getOrdpriceMan().equals("A")){
/*当前对象是客人*/
tbConObj.setLowPrice(tbConOrd.getOrderPrice());
}else{
/*当前对象是主人*/
tbConObj.setHighPrice(tbConOrd.getOrderPrice());
}
}else{
/*采购单*/
/*设置最低价位*/
if(tbConObj.getLowPrice()==null){
tbConObj.setLowPrice(tbConObj.getOrderPrice());
}
if(tbConOrdPrice.getOrdpriceMan()==null){
/*当前对象是客人*/
tbConObj.setHighPrice(tbConOrd.getOrderPrice());
}else if(tbConOrdPrice.getOrdpriceMan().equals("A")){
/*当前对象是客人*/
tbConObj.setHighPrice(tbConOrd.getOrderPrice());
}else{
/*当前对象是主人*/
tbConObj.setLowPrice(tbConOrd.getOrderPrice());
}
}
tbConObj = tbConObjService.updateTbConObj(tbConObj);
tbConObjService.listClearCache(tbConObj.getConobjKey());
tbConOrdService.listClearCache();
tbConOrdService.listClearCache(tbConObj.getConobjKey(),tbConOrdVo.getTradeCustomerKey());
}
System.out.println("==============MQ Write to Database success============");
} catch (JMSException e) {
e.printStackTrace();
} catch (EsteelException e) {
e.printStackTrace();
}
}
private TbConOrd getExistTbConOrd(TbConOrd tbConOrd) throws EsteelException{
TbConOrd tbConOrdNew = new TbConOrd();
tbConOrdNew.setConobjKey(tbConOrd.getConobjKey());
tbConOrdNew.setCustomerKey(tbConOrd.getCustomerKey());
TbConOrd tbConOrdNew2 = new TbConOrd();
tbConOrdNew2 = tbConOrdService.getTbConOrdByObj(tbConOrdNew);
if(null==tbConOrdNew2){
return tbConOrdNew2;
}
return tbConOrdNew2;
}
private TbConOrd copyTbConOrd(TbConOrdVo tbConOrdVo, TbConOrd tbConOrd, String ipAddress) {
tbConOrd.setConobjKey(tbConOrdVo.getConobjKey());
tbConOrd.setContradeType(tbConOrdVo.getContradeType());
tbConOrd.setCanTradeTypes(tbConOrdVo.getCanTradeTypes());
tbConOrd.setBailBankCode(tbConOrdVo.getBailBankCode());
tbConOrd.setOrderPrice(tbConOrdVo.getOrderPrice());
tbConOrd.setTradeComm(tbConOrdVo.getTradeComm());
tbConOrd.setDealBail(tbConOrdVo.getDealBail());
tbConOrd.setDisTime(Calendar.getInstance().getTime());
tbConOrd.setDisIp(tbConOrdVo.getDisIp());
tbConOrd.setStartTranDate(tbConOrdVo.getStartTranDate());
tbConOrd.setSubsidyInterest(tbConOrdVo.getSubsidyInterest());
tbConOrd.setBackCause(tbConOrdVo.getBackCause());
tbConOrd.setPickType(tbConOrdVo.getPickType());
tbConOrd.setUponRange(tbConOrdVo.getUponRange());
tbConOrd.setAddsubMark(tbConOrdVo.getAddsubMark());
/* 上面的是从Vo中继承过来的 */
/* 下面的是手动添加的 */
tbConOrd.setTradeDate(new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime()));
/*用tradeCustomerKey来替换customerKey*/
tbConOrd.setCustomerKey(tbConOrdVo.getTradeCustomerKey());
tbConOrd.setOrderNum(tbConOrdVo.getNewNum());
tbConOrd.setOrderPrice(tbConOrdVo.getNewPrice());
tbConOrd.setCntPrice(tbConOrdVo.getNewPrice());
tbConOrd.setOrderStatus("A");
tbConOrd.setTradeComm("0");
tbConOrd.setDealBail("0");
tbConOrd.setOrderTime(Calendar.getInstance().getTime());
tbConOrd.setOrderIp(ipAddress);
tbConOrd.setDisTime(tbConOrd.getOrderTime());
tbConOrd.setOrderIp(tbConOrd.getOrderIp());
tbConOrd.setSubsidyInterest("0");
tbConOrd.setMarkchkStatus("A");
tbConOrd.setShouldPayBail("0");
tbConOrd.setShouldPayMoney("0");
tbConOrd.setIsBailPayGoods("Y");
tbConOrd.setPickType("A");
return tbConOrd;
}
private TbConOrdPrice copyTbConOrdPrice(TbConOrd tbConOrd, TbConOrdVo tbConOrdVo, TbConOrdPrice tbConOrdPrice) throws EsteelException {
tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());
tbConOrdPrice.setCustomerKey(tbConOrd.getCustomerKey());
tbConOrdPrice.setKfCustomerKey(tbConOrd.getKfCustomerKey());
tbConOrdPrice.setNewNum(tbConOrd.getOrderNum());
tbConOrdPrice.setNewPrice(tbConOrd.getOrderPrice());
tbConOrdPrice.setIsNewprice("Y");
tbConOrdPrice.setTradeComm(tbConOrd.getTradeComm());
tbConOrdPrice.setDealBail(tbConOrd.getDealBail());
tbConOrdPrice.setMsgText(tbConOrdVo.getMsgText());
tbConOrdPrice.setIrdTime(tbConOrd.getOrderTime());
tbConOrdPrice.setOrdpriceType("A");
tbConOrdPrice.setIsLook("N");
tbConOrdPrice.setCdListKeys(tbConOrd.getCdListKeys());
tbConOrdPrice.setStartTranDate(tbConOrd.getStartTranDate());
tbConOrdPrice.setSubsidyInterest(tbConOrd.getSubsidyInterest());
tbConOrdPrice.setUponRange(tbConOrd.getUponRange());
tbConOrdPrice.setAddsubMark(tbConOrd.getAddsubMark());
tbConOrdPrice.setUponRange2(tbConOrd.getUponRange2());
tbConOrdPrice.setAddsubMark2(tbConOrd.getAddsubMark2());
tbConOrdPrice.setLookTimes("0");
tbConOrdPrice.setOrdpriceMan(tbConOrdVo.getOrdpriceMan());
/* 为了取当前的序号,需要获取当前的最新的值,然后+1*/
/* 最后用时间代替了这个number */
tbConOrdPrice.setOrdpriceNo(String.valueOf(System.currentTimeMillis()));
return tbConOrdPrice;
}
}
发送:
package com.esteel.message.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
public class MessageConvertForSys implements MessageConverter {
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
System.out.println("sendMessage:" + object.toString());
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setStringProperty("key_esteelMessage", object.toString());
return objectMessage;
}
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
ObjectMessage objectMessage = (ObjectMessage) message;
return objectMessage.getObjectProperty("key_esteelMessage");
}
}
推荐阅读
-
activeMq集成 博客分类: activeMq
-
activeMq多个监听配置 博客分类: activeMq
-
activemq多群负载均衡 博客分类: activeMq activemq
-
测试ActiveMQ主从复制 博客分类: activeMq activemq
-
rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack 博客分类: rabbitmq
-
CKEditor3.0在asp.net环境下上传文件的配置,集成CKFinder 博客分类: ASP.NET ASP.netASP.netfckeditorJavaScript
-
jenkins部署时,弹出提示部署不成功 博客分类: 持续集成 jenkinscargodeployContainerException
-
rocketMq实战(2)-客户端集成 博客分类: rocketmq rocketmq客户端
-
rocketMq实战(2)-客户端集成 博客分类: rocketmq rocketmq客户端
-
activemq点对点消息传送 博客分类: activemq activemq