RocketMQ使用
程序员文章站
2022-03-05 12:27:59
...
消息发送端(客户端)代码结构:
RocketMQ配置文件:
rocketmq.properties
#wbw 2016.8.22
#rocketmq的NameServer地址
#rocketmq.namesrvAddr=10.104.102.22:9876
#rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.namesrvAddr=10.**.***.**:9876
#rocketmq生产者配置
#rocketmq的ProducerGroupName,需要由应用来保证唯一。发送普通的消息时,作用不大,但是发送分布式事务消息时,服务器会回查这个Group下的任意一个Producer
rocketmq.producerGroup=InspurTaxMQProducer
#rocketmq消费者配置
#rocketmq的ConsumerGroupName,需要由应用来保证唯一。
rocketmq.consumerGroup=InspurTaxMQConsumer
#rocketmq消费者订阅情况格式为topic:tag1||tag2,topic2:*,topic3:tag3(多个tag用'||'分隔,'*'代表不过滤tag)
rocketmq.subscribes=TestTopic:test||test2,TestTopic2:*
#rocketmq最小消费者线程数(默认:20)
rocketmq.consumeThreadMin=20
#rocketmq最大消费者线程数(默认:64)
rocketmq.consumeThreadMax=64
#rocketmq批量接收消息的数量(默认:1)
rocketmq.consumeMessageBatchMaxSize=1
#rocketmq是否广播消费模式(默认:false)
rocketmq.isBroadcasting=false
#rocketmq消息最大长度(256KB)
rocketmq.maxMessageSize=2621440000
#队列初始个数(weblogic集群节点数加1)
rocketmq.queueNum=10
mq.properties文件:
#producer
#每个集群节点配置单独的监听队列
default=4
Server_sstww=4
M1=1
M2=2
M3=3
M4=4
Code.java和CodeCenter.java定义了MQ状态码集:
package com.inspur.tax.common.rocketmq.data;
/**
* Title: Code.java
* Description: MQ状态代码集
*/
public class Code {
/**
* 大类状态代码
*/
//正常返回
public static final String SUCCESS_CODE = "0";
//异常返回
public static final String FAIL_CODE = "1";
/**
* 小类状态代码
*/
//成功
public static final String CODE_0000 = "0000";
//Server execute business method error
public static final String CODE_0001 = "0001";
//Send message fail
public static final String CODE_0010 = "0010";
//wait timeout
public static final String CODE_0011 = "0011";
//recived message body is empty
public static final String CODE_0012 = "0012";
//JsonUtil.jsonStrToObject error
public static final String CODE_0013 = "0013";
}
package com.inspur.tax.common.rocketmq.data;
/**
* Title: CodeCenter.java
* Description: MQ状态代码集
*/
public class CodeCenter {
public static String getInfo(String id) {
String result = "";
switch (Integer.parseInt(id)) {
case 0001: {
result = "Server execute business method error";
break;
}
case 0010: {
result = "Send message fail";
break;
}
case 0011: {
result = "wait timeout";
break;
}
case 0012: {
result = "recived message body is empty";
break;
}
case 0013: {
result = "JsonUtil.jsonStrToObject error";
break;
}
case 9999: {
result = "Unknown error";
break;
}
default: {
result = "Unknown error";
break;
}
}
return result;
}
}
MessageProperty.java为MQ读取配置文件获取配置信息的方法:
package com.inspur.tax.common.rocketmq;
import java.io.IOException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.inspur.tax.utils.PropertiesLoader;
public class MessageProperty {
private final static Logger log = LoggerFactory.getLogger(MessageProperty.class);
private static final PropertiesLoader rocketmqProperties = new PropertiesLoader("rocketmq.properties");
/**
* 从配置文件rocketmq.properties中获取namesrvAddr
*/
public static final String namesrvAddr = rocketmqProperties.getProperty("rocketmq.namesrvAddr");
public static final String isBroadcasting = rocketmqProperties.getProperty("rocketmq.isBroadcasting");
// public static final int maxMessageSize = Integer.parseInt(rocketmqProperties.getProperty("rocketmq.maxMessageSize"));
/**
* 队列个数(weblogic集群节点数加1)
*/
public static final int queueNum = Integer.parseInt(rocketmqProperties.getProperty("rocketmq.queueNum"));
public static final String producerGroup = "producerGroup_ww";
public static final String consumerGroup = "consumerGroup_ww";
public static final String producerInstanceName = producerGroup + String.valueOf(System.currentTimeMillis());
public static final String consumerInstanceName = consumerGroup + String.valueOf(System.currentTimeMillis());
/**
* Title: getQueueId
* Description: producer
* Param @return
* Param @throws Exception
* Return int[]
* Modify: GongZhf 2017年2月19日上午10:33:07 TODO
*/
public static int getQueueId() throws Exception{
int queueId = -1;
String queueIdStr = null;
String sysParam = System.getProperty("weblogic.Name");
if(log.isInfoEnabled()){
log.info("[property]sysParam..."+sysParam);
}
if(sysParam == null){
if(log.isInfoEnabled()){
log.info("[property]param [weblogic.Name] not found,try to get [default]...");
}
queueIdStr = getPropertie().getProperty("default");
}else if(getPropertie().getProperty(sysParam) == null){
if(log.isInfoEnabled()){
log.info("[property]["+sysParam+"] not found in mq.properties,try to get [default]...");
}
queueIdStr = getPropertie().getProperty("default");
}else{
queueIdStr = getPropertie().getProperty(sysParam);
}
if(log.isInfoEnabled()){
log.info("[property]queueId..."+queueIdStr);
}
if(queueIdStr != null){
queueId = Integer.parseInt(queueIdStr);
}else{
if(log.isErrorEnabled()){
log.error("[property]weblogic.Name["+sysParam+"]does not match the cluster serverName...");
}
throw new Exception("[property]weblogic.Name["+sysParam+"]does not match the cluster serverName...");
}
return queueId;
}
public static MessageModel getMessageModel(){
return isBroadcasting.equals("false") ? MessageModel.CLUSTERING : MessageModel.BROADCASTING;
}
public static Properties getPropertie() {
Properties property = new Properties();
try {
Class<MessageProperty> myClass = MessageProperty.class;
property.load(myClass.getResource("mq.properties").openStream());
} catch (IOException e) {
e.printStackTrace();
}
return property;
}
}
Consumer消费者:
package com.inspur.tax.common.rocketmq.factory;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.inspur.tax.common.rocketmq.MessageProperty;
/**
* Title: Consumer.java
* Description: TODO
* Date: 2017年3月2日下午4:14:07
* Author: GongZhf
* modify:GongZhf 2017年3月2日下午4:14:07 TODO
*/
public class Consumer {
private static DefaultMQPushConsumer consumer = null;
static{
if(consumer == null){
consumer = new DefaultMQPushConsumer(MessageProperty.consumerGroup);
consumer.setNamesrvAddr(MessageProperty.namesrvAddr);
consumer.setMessageModel(MessageProperty.getMessageModel());
try {
consumer.subscribe("MQ_RESPONSE_TOPIC", "MQ_RESPONSE_TAG");
// 程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
public static DefaultMQPushConsumer getInstance(){
return consumer;
}
public static void setConsumer(DefaultMQPushConsumer consumer) {
Consumer.consumer = consumer;
}
}
Producer生产者:
package com.inspur.tax.common.rocketmq.factory;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.inspur.tax.common.rocketmq.MessageProperty;
/**
* Title: Producer.java
* Description: TODO
*/
public class Producer {
private static DefaultMQProducer producer = null;
static{
if(producer == null){
producer = new DefaultMQProducer(MessageProperty.producerGroup);
producer.setInstanceName(MessageProperty.producerInstanceName);
producer.setNamesrvAddr(MessageProperty.namesrvAddr);
// producer.setMaxMessageSize(MessageProperty.maxMessageSize);
try {
producer.start();
// producer.createTopic("WW_REQUEST_TOPIC", "WW_REQUEST_TAG", MessageProperty.queueNum);
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
public static DefaultMQProducer getInstance(){
return producer;
}
public static void setProducer(DefaultMQProducer producer) {
Producer.producer = producer;
}
}
MqUtil为MQ消息发送公共类:
package com.inspur.tax.common.rocketmq.util;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.MessageProperty;
import com.inspur.tax.common.rocketmq.data.Code;
import com.inspur.tax.common.rocketmq.data.CodeCenter;
import com.inspur.tax.common.rocketmq.factory.Producer;
import com.inspur.tax.utils.StringUtils;
import com.inspur.tax.utils.json.JsonInput;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;
/**
* Title: MqUtil.java
* Description: MQ发送入口
* Date: 2017年3月2日下午4:12:21
* Author: GongZhf
* modify:GongZhf 2017年3月2日下午4:12:21 TODO
*/
public class MqUtil {
private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
public static Map<String,BlockingQueue<Object>> globalMap = new HashMap<String,BlockingQueue<Object>>();
public static JsonOutput sendMessage(JsonInput inputJson) {
Object result = null;
JsonOutput jsonOutput = null;
DefaultMQProducer producer = Producer.getInstance();
try {
inputJson.setQueueId(MessageProperty.getQueueId());
String requestJson = JsonUtil.objectToJsonStr(inputJson);
String keys = StringUtils.getUUID();
Message msg = new Message("WW_REQUEST_TOPIC", "WW_REQUEST_TAG", keys, requestJson.getBytes());
//SendResult sendResult = producer.send(msg);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(Integer.parseInt(String.valueOf(arg)));// 发送给指定的队列
}
}, MessageProperty.getQueueId());
if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
if(log.isInfoEnabled()){
log.info("[MqUtil]send ok...keys..." + keys);
}
BlockingQueue<Object> bq = new ArrayBlockingQueue<Object>(1);
globalMap.put(keys, bq);
result = bq.poll(60, TimeUnit.SECONDS);
if (result == null) {
jsonOutput = new JsonOutput(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
inputJson.getTranSeq(),
inputJson.getTranId(),
Code.FAIL_CODE,
Code.CODE_0011,
CodeCenter.getInfo(Code.CODE_0011),
inputJson.getSignatureInfo(),
new ArrayList<Map<String, Object>>());
if(log.isErrorEnabled()){
log.error("[MqUtil]BlockingQueue wait timeout...keys..." + keys);
}
}else if(result.toString().equals("empty")){
jsonOutput = new JsonOutput(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
inputJson.getTranSeq(),
inputJson.getTranId(),
Code.FAIL_CODE,
Code.CODE_0012,
CodeCenter.getInfo(Code.CODE_0012),
inputJson.getSignatureInfo(),
new ArrayList<Map<String, Object>>());
if(log.isErrorEnabled()){
log.error("[MqUtil]recived message body is empty...keys..." + keys);
}
}else{
if(log.isInfoEnabled()){
log.info("[MqUtil]recived success...keys..." + keys + "...result..." + (String)result);
}
jsonOutput = JsonUtil.jsonStrToObject((String)result, JsonOutput.class);
if(jsonOutput == null){
jsonOutput = new JsonOutput(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
inputJson.getTranSeq(),
inputJson.getTranId(),
Code.FAIL_CODE,
Code.CODE_0013,
CodeCenter.getInfo(Code.CODE_0013),
inputJson.getSignatureInfo(),
new ArrayList<Map<String, Object>>());
if(log.isErrorEnabled()){
log.error("[MqUtil]JsonUtil.jsonStrToObject error...keys..." + keys);
}
}
}
globalMap.remove(keys);
}else{
jsonOutput = new JsonOutput(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
inputJson.getTranSeq(),
inputJson.getTranId(),
Code.FAIL_CODE,
Code.CODE_0010,
CodeCenter.getInfo(Code.CODE_0010),
inputJson.getSignatureInfo(),
new ArrayList<Map<String, Object>>());
if(log.isErrorEnabled()){
log.error("[MqUtil]send message fail...keys..." + keys);
}
}
} catch (Exception e) {
if(log.isErrorEnabled()){
log.error("[MqUtil]mq error...errorMessage..." + e.getMessage());
}
e.printStackTrace();
}
return jsonOutput;
}
}
MessageListener为消息发送到(客户端)的MQ监听类,监听MQ服务器返回来的消息放入BlockingQueue特殊队列中:MQUtil类中通过
BlockingQueue<Object> bq = new ArrayBlockingQueue<Object>(1);
globalMap.put(keys, bq);
result = bq.poll(60, TimeUnit.SECONDS);
来等待60秒获取指定key的消息返回值(Key一般用消息在业务层面的唯一标识码,可以方便后续跟踪查询,尽量保证Key的唯一性)
消息接受端(服务端)代码结构:
MQ配置文件读取、状态码集、消费者、生产者、消息监听器、Mqutil公共类和发送端的代码总体上一致,唯一不同的为
MessageListener类和MqUtil类代码如下:
package com.inspur.tax.common.rocketmq.listener;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.loushang.framework.util.SpringContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.data.Code;
import com.inspur.tax.common.rocketmq.data.CodeCenter;
import com.inspur.tax.common.rocketmq.factory.Consumer;
import com.inspur.tax.common.rocketmq.util.MqUtil;
import com.inspur.tax.sst.common.service.impl.ManagerCenterMQService;
import com.inspur.tax.utils.json.JsonInput;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;
/**
* Title: MessageListener.java Description: MQ监听 Date: 2017年3月2日下午4:12:35
* Author: GongZhf modify:GongZhf 2017年3月2日下午4:12:35 TODO
*/
public class MessageListener implements ServletContextListener {
private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
private static final DefaultMQPushConsumer consumer = Consumer.getInstance();
@Override
public void contextDestroyed(ServletContextEvent arg0) {
consumer.shutdown();
Consumer.setConsumer(null);
}
@Override
public void contextInitialized(ServletContextEvent arg0) {
if (log.isInfoEnabled()) {
log.info("[MessageListener]listener starting...");
}
try {
/**
* 生产没有setAllocateMessageQueueStrategy方法
*/
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
@Override
public String getName() {
return consumer.getConsumerGroup();
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> submq = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
if (mq.getQueueId() == 4) {
submq.add(mq);
}
}
return submq;
}
});
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
MessageExt msgExt = (MessageExt) msgs.get(0);
String keys = msgExt.getKeys();
String body = new String(msgExt.getBody());
if (log.isInfoEnabled()) {
log.info("[MessageListener]server listened keys..." + keys + "...msgExt...."
+ msgExt.toString());
log.info("[MessageListener]server listened keys..." + keys + "....body....." + body);
}
String responseJson = null;
JsonOutput outputJson = null;
JsonInput inputJson = JsonUtil.jsonStrToObject(body, JsonInput.class);
try {
ManagerCenterMQService mcService = (ManagerCenterMQService) SpringContextHolder
.getBean("managerCenterMQService");
responseJson = mcService.execute(body);
if (log.isInfoEnabled()) {
log.info("[MessageListener]server execute business method success... keys......" + keys
+ "...responseJson..." + responseJson);
}
outputJson = JsonUtil.jsonStrToObject(responseJson, JsonOutput.class);
} catch (Exception e) {
outputJson = new JsonOutput(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
inputJson.getTranSeq(), inputJson.getTranId(), Code.FAIL_CODE, Code.CODE_0001,
CodeCenter.getInfo(Code.CODE_0001), inputJson.getSignatureInfo(),
new ArrayList<Map<String, Object>>());
if (log.isErrorEnabled()) {
log.error("[MessageListener]server execute business method error......keys..." + keys
+ "...errorMessage..." + e.getMessage());
}
e.printStackTrace();
}
transmitMessage(inputJson.getQueueId(), keys, outputJson);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
if (log.isErrorEnabled()) {
log.error("[MessageListener]server registerMessageListener error...errorMessage..." + e.getMessage());
}
}
}
public void transmitMessage(int queueId, String keys, JsonOutput outputJson) {
MqUtil.sendMessage(queueId, keys, outputJson);
}
}
package com.inspur.tax.common.rocketmq.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.factory.Producer;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;
/**
* Title: MqUtil.java
* Description: TODO
* Date: 2017年3月2日下午4:13:33
* Author: GongZhf
* modify:GongZhf 2017年3月2日下午4:13:33 TODO
*/
public class MqUtil {
private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
public static Map<String,BlockingQueue<Object>> globalMap = new HashMap<String,BlockingQueue<Object>>();
public static void sendMessage(int queueId, String keys, JsonOutput outputJson) {
try {
final DefaultMQProducer producer = Producer.getInstance();
String requestJson = JsonUtil.objectToJsonStr(outputJson);
Message msg = new Message("MQ_RESPONSE_TOPIC", "MQ_RESPONSE_TAG", keys, requestJson.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(Integer.parseInt(String.valueOf(arg)));// 发送给指定的队列
}
}, queueId);
if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
if(log.isInfoEnabled()){
log.info("[MqUtil]server send success...key..." + keys + "...queueId..." + queueId);
}
}else{
if(log.isErrorEnabled()){
log.error("[MqUtil]server sendStatus not ok...key..." + keys + "...queueId..." + queueId + "...sendStatus..." + sendResult.getSendStatus());
}
}
} catch (Exception e) {
if(log.isErrorEnabled()){
log.error("[MqUtil]server send error...key..." + keys + "...queueId..." + queueId + "...errorMessage..." + e.getMessage());
}
e.printStackTrace();
}
}
}
在服务端发送返回消息时需要注意queueID和keys应该和客户端一致。