欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ使用

程序员文章站 2022-03-05 12:27:59
...

消息发送端(客户端)代码结构:

RocketMQ使用

RocketMQ配置文件:

rocketmq.properties

#wbw 2016.8.22

#rocketmq的NameServer地址
#rocketmq.namesrvAddr=10.104.102.22:9876
#rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.namesrvAddr=10.**.***.**:9876

#rocketmq生产者配置
#rocketmq的ProducerGroupName,需要由应用来保证唯一。发送普通的消息时,作用不大,但是发送分布式事务消息时,服务器会回查这个Group下的任意一个Producer
rocketmq.producerGroup=InspurTaxMQProducer

#rocketmq消费者配置
#rocketmq的ConsumerGroupName,需要由应用来保证唯一。
rocketmq.consumerGroup=InspurTaxMQConsumer
#rocketmq消费者订阅情况格式为topic:tag1||tag2,topic2:*,topic3:tag3(多个tag用'||'分隔,'*'代表不过滤tag)
rocketmq.subscribes=TestTopic:test||test2,TestTopic2:*
#rocketmq最小消费者线程数(默认:20)
rocketmq.consumeThreadMin=20
#rocketmq最大消费者线程数(默认:64)
rocketmq.consumeThreadMax=64
#rocketmq批量接收消息的数量(默认:1)
rocketmq.consumeMessageBatchMaxSize=1
#rocketmq是否广播消费模式(默认:false)
rocketmq.isBroadcasting=false
#rocketmq消息最大长度(256KB)
rocketmq.maxMessageSize=2621440000
#队列初始个数(weblogic集群节点数加1)
rocketmq.queueNum=10

 mq.properties文件:

#producer
#每个集群节点配置单独的监听队列
default=4
Server_sstww=4
M1=1
M2=2
M3=3
M4=4

Code.java和CodeCenter.java定义了MQ状态码集:

package com.inspur.tax.common.rocketmq.data;
/**
 * Title: Code.java
 * Description: MQ状态代码集
 */
public class Code {
	/**
	 * 大类状态代码
	 */
	//正常返回
	public static final String SUCCESS_CODE = "0";
	//异常返回
	public static final String FAIL_CODE = "1";
	/**
	 * 小类状态代码
	 */
	//成功
	public static final String CODE_0000 = "0000";
	//Server execute business method error
	public static final String CODE_0001 = "0001";
	//Send message fail
	public static final String CODE_0010 = "0010";
	//wait timeout
	public static final String CODE_0011 = "0011";
	//recived message body is empty
	public static final String CODE_0012 = "0012";
	//JsonUtil.jsonStrToObject error
	public static final String CODE_0013 = "0013";
}
package com.inspur.tax.common.rocketmq.data;
/**
 * Title: CodeCenter.java
 * Description: MQ状态代码集
 */
public class CodeCenter {
	public static String getInfo(String id) {
		String result = "";
		switch (Integer.parseInt(id)) {
		case 0001: {
			result = "Server execute business method error";
			break;
		}
		case 0010: {
			result = "Send message fail";
			break;
		}
		case 0011: {
			result = "wait timeout";
			break;
		}
		case 0012: {
			result = "recived message body is empty";
			break;
		}
		case 0013: {
			result = "JsonUtil.jsonStrToObject error";
			break;
		}
		case 9999: {
			result = "Unknown error";
			break;
		}
		default: {
			result = "Unknown error";
			break;
		}
		}
		return result;
	}
}

MessageProperty.java为MQ读取配置文件获取配置信息的方法:

package com.inspur.tax.common.rocketmq;

import java.io.IOException;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.inspur.tax.utils.PropertiesLoader;

public class MessageProperty {

	private final static Logger log = LoggerFactory.getLogger(MessageProperty.class);
	private static final PropertiesLoader rocketmqProperties = new PropertiesLoader("rocketmq.properties");

	/**
	 * 从配置文件rocketmq.properties中获取namesrvAddr
	 */
	public static final String namesrvAddr = rocketmqProperties.getProperty("rocketmq.namesrvAddr");
	
	public static final String isBroadcasting = rocketmqProperties.getProperty("rocketmq.isBroadcasting");
	
//	public static final int maxMessageSize = Integer.parseInt(rocketmqProperties.getProperty("rocketmq.maxMessageSize"));
	/**
	 * 队列个数(weblogic集群节点数加1)
	 */
	public static final int queueNum = Integer.parseInt(rocketmqProperties.getProperty("rocketmq.queueNum"));

	public static final String producerGroup = "producerGroup_ww";

	public static final String consumerGroup = "consumerGroup_ww";
	
	public static final String producerInstanceName = producerGroup + String.valueOf(System.currentTimeMillis());
	
	public static final String consumerInstanceName = consumerGroup + String.valueOf(System.currentTimeMillis());
	
	
	/**
	  * Title: getQueueId 
	  * Description: producer
	  * Param @return
	  * Param @throws Exception
	  * Return int[]
	  * Modify: GongZhf 2017年2月19日上午10:33:07 TODO
	 */
	public static int getQueueId() throws Exception{
		int queueId = -1;
		String queueIdStr = null;
		String sysParam = System.getProperty("weblogic.Name");
		if(log.isInfoEnabled()){
			log.info("[property]sysParam..."+sysParam);
		}
		if(sysParam == null){
			if(log.isInfoEnabled()){
				log.info("[property]param [weblogic.Name] not found,try to get [default]...");
			}
			queueIdStr = getPropertie().getProperty("default");
		}else if(getPropertie().getProperty(sysParam) == null){
			if(log.isInfoEnabled()){
				log.info("[property]["+sysParam+"] not found in mq.properties,try to get [default]...");
			}
			queueIdStr = getPropertie().getProperty("default");
		}else{
			queueIdStr = getPropertie().getProperty(sysParam);
		}
		if(log.isInfoEnabled()){
			log.info("[property]queueId..."+queueIdStr);
		}
		if(queueIdStr != null){
			queueId = Integer.parseInt(queueIdStr);
		}else{
			if(log.isErrorEnabled()){
				log.error("[property]weblogic.Name["+sysParam+"]does not match the cluster serverName...");
			}
			throw new Exception("[property]weblogic.Name["+sysParam+"]does not match the cluster serverName...");
		}
		return queueId;
	}
	public static MessageModel getMessageModel(){
		return isBroadcasting.equals("false") ? MessageModel.CLUSTERING : MessageModel.BROADCASTING;
	}
	public static Properties getPropertie() {
		Properties property = new Properties();
		try {
			Class<MessageProperty> myClass = MessageProperty.class;
			property.load(myClass.getResource("mq.properties").openStream());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return property;
	}
}

Consumer消费者:

package com.inspur.tax.common.rocketmq.factory;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.inspur.tax.common.rocketmq.MessageProperty;
/**
 * Title: Consumer.java
 * Description: TODO
 * Date: 2017年3月2日下午4:14:07
 * Author: GongZhf
 * modify:GongZhf 2017年3月2日下午4:14:07 TODO
 */
public class Consumer {
	private static DefaultMQPushConsumer consumer = null;
	
	static{
		if(consumer == null){
			consumer = new DefaultMQPushConsumer(MessageProperty.consumerGroup);
			consumer.setNamesrvAddr(MessageProperty.namesrvAddr);
			consumer.setMessageModel(MessageProperty.getMessageModel());
			try {
				consumer.subscribe("MQ_RESPONSE_TOPIC", "MQ_RESPONSE_TAG");
				// 程序第一次启动从消息队列头取数据
				consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
			} catch (MQClientException e) {
				e.printStackTrace();
			}
		}
	}
	public static DefaultMQPushConsumer getInstance(){
		return consumer;
	}
	public static void setConsumer(DefaultMQPushConsumer consumer) {
		Consumer.consumer = consumer;
	}
}

Producer生产者:

package com.inspur.tax.common.rocketmq.factory;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.inspur.tax.common.rocketmq.MessageProperty;
/**
 * Title: Producer.java
 * Description: TODO
 */
public class Producer {
	private static DefaultMQProducer producer = null;
	
	static{
		if(producer == null){
			producer = new DefaultMQProducer(MessageProperty.producerGroup);
			producer.setInstanceName(MessageProperty.producerInstanceName);
			producer.setNamesrvAddr(MessageProperty.namesrvAddr);
//			producer.setMaxMessageSize(MessageProperty.maxMessageSize);
			try {
				producer.start();
//				producer.createTopic("WW_REQUEST_TOPIC", "WW_REQUEST_TAG", MessageProperty.queueNum);
			} catch (MQClientException e) {
				e.printStackTrace();
			}
		}
	}
	public static DefaultMQProducer getInstance(){
		return producer;
	}
	public static void setProducer(DefaultMQProducer producer) {
		Producer.producer = producer;
	}
}

MqUtil为MQ消息发送公共类:

package com.inspur.tax.common.rocketmq.util;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.MessageProperty;
import com.inspur.tax.common.rocketmq.data.Code;
import com.inspur.tax.common.rocketmq.data.CodeCenter;
import com.inspur.tax.common.rocketmq.factory.Producer;
import com.inspur.tax.utils.StringUtils;
import com.inspur.tax.utils.json.JsonInput;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;
/**
 * Title: MqUtil.java
 * Description: MQ发送入口
 * Date: 2017年3月2日下午4:12:21
 * Author: GongZhf
 * modify:GongZhf 2017年3月2日下午4:12:21 TODO
 */
public class MqUtil {
	private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
	public static Map<String,BlockingQueue<Object>> globalMap = new HashMap<String,BlockingQueue<Object>>();
	
	public static JsonOutput sendMessage(JsonInput inputJson) {
		Object result = null;
		JsonOutput jsonOutput = null;
		DefaultMQProducer producer = Producer.getInstance();
		try {
			inputJson.setQueueId(MessageProperty.getQueueId());
			String requestJson = JsonUtil.objectToJsonStr(inputJson);
			String keys = StringUtils.getUUID();
			Message msg = new Message("WW_REQUEST_TOPIC", "WW_REQUEST_TAG", keys, requestJson.getBytes());
			//SendResult sendResult = producer.send(msg);
			SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return mqs.get(Integer.parseInt(String.valueOf(arg)));// 发送给指定的队列
				}
			}, MessageProperty.getQueueId());
			if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
				if(log.isInfoEnabled()){
					log.info("[MqUtil]send ok...keys..." + keys);
				}
				BlockingQueue<Object> bq = new ArrayBlockingQueue<Object>(1);
				globalMap.put(keys, bq);
				
				result = bq.poll(60, TimeUnit.SECONDS);
				if (result == null) {
					jsonOutput = new JsonOutput(
							new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), 
							inputJson.getTranSeq(), 
							inputJson.getTranId(), 
							Code.FAIL_CODE, 
							Code.CODE_0011, 
							CodeCenter.getInfo(Code.CODE_0011), 
							inputJson.getSignatureInfo(), 
							new ArrayList<Map<String, Object>>());
					if(log.isErrorEnabled()){
						log.error("[MqUtil]BlockingQueue wait timeout...keys..." + keys);
					}
				}else if(result.toString().equals("empty")){
					jsonOutput = new JsonOutput(
							new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), 
							inputJson.getTranSeq(), 
							inputJson.getTranId(), 
							Code.FAIL_CODE, 
							Code.CODE_0012, 
							CodeCenter.getInfo(Code.CODE_0012), 
							inputJson.getSignatureInfo(), 
							new ArrayList<Map<String, Object>>());
					if(log.isErrorEnabled()){
						log.error("[MqUtil]recived message body is empty...keys..." + keys);
					}
				}else{
					if(log.isInfoEnabled()){
						log.info("[MqUtil]recived success...keys..." + keys + "...result..." + (String)result);
					}
					jsonOutput = JsonUtil.jsonStrToObject((String)result, JsonOutput.class);
					if(jsonOutput == null){
						jsonOutput = new JsonOutput(
								new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), 
								inputJson.getTranSeq(), 
								inputJson.getTranId(), 
								Code.FAIL_CODE, 
								Code.CODE_0013, 
								CodeCenter.getInfo(Code.CODE_0013), 
								inputJson.getSignatureInfo(), 
								new ArrayList<Map<String, Object>>());
						if(log.isErrorEnabled()){
							log.error("[MqUtil]JsonUtil.jsonStrToObject error...keys..." + keys);
						}
					}
				}
				globalMap.remove(keys);
			}else{
				jsonOutput = new JsonOutput(
						new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), 
						inputJson.getTranSeq(), 
						inputJson.getTranId(), 
						Code.FAIL_CODE, 
						Code.CODE_0010, 
						CodeCenter.getInfo(Code.CODE_0010), 
						inputJson.getSignatureInfo(), 
						new ArrayList<Map<String, Object>>());
				if(log.isErrorEnabled()){
					log.error("[MqUtil]send message fail...keys..." + keys);
				}
			}
		} catch (Exception e) {
			if(log.isErrorEnabled()){
				log.error("[MqUtil]mq error...errorMessage..." + e.getMessage());
			}
			e.printStackTrace();
		}
		return jsonOutput;
	}
}

MessageListener为消息发送到(客户端)的MQ监听类,监听MQ服务器返回来的消息放入BlockingQueue特殊队列中:MQUtil类中通过

BlockingQueue<Object> bq = new ArrayBlockingQueue<Object>(1);
                globalMap.put(keys, bq);
                
                result = bq.poll(60, TimeUnit.SECONDS);

来等待60秒获取指定key的消息返回值(Key一般用消息在业务层面的唯一标识码,可以方便后续跟踪查询,尽量保证Key的唯一性

消息接受端(服务端)代码结构:

RocketMQ使用

MQ配置文件读取、状态码集、消费者、生产者、消息监听器、Mqutil公共类和发送端的代码总体上一致,唯一不同的为

MessageListener类和MqUtil类代码如下:

package com.inspur.tax.common.rocketmq.listener;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.loushang.framework.util.SpringContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.data.Code;
import com.inspur.tax.common.rocketmq.data.CodeCenter;
import com.inspur.tax.common.rocketmq.factory.Consumer;
import com.inspur.tax.common.rocketmq.util.MqUtil;
import com.inspur.tax.sst.common.service.impl.ManagerCenterMQService;
import com.inspur.tax.utils.json.JsonInput;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;

/**
 * Title: MessageListener.java Description: MQ监听 Date: 2017年3月2日下午4:12:35
 * Author: GongZhf modify:GongZhf 2017年3月2日下午4:12:35 TODO
 */
public class MessageListener implements ServletContextListener {

	private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
	private static final DefaultMQPushConsumer consumer = Consumer.getInstance();

	@Override
	public void contextDestroyed(ServletContextEvent arg0) {
		consumer.shutdown();
		Consumer.setConsumer(null);
	}
	@Override
	public void contextInitialized(ServletContextEvent arg0) {
		if (log.isInfoEnabled()) {
			log.info("[MessageListener]listener starting...");
		}
		try {
			/**
			 * 生产没有setAllocateMessageQueueStrategy方法
			 */
			consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
				@Override
				public String getName() {
					return consumer.getConsumerGroup();
				}

				@Override
				public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
						List<String> cidAll) {
					List<MessageQueue> submq = new ArrayList<MessageQueue>();
					for (MessageQueue mq : mqAll) {
						if (mq.getQueueId() == 4) {
							submq.add(mq);
						}
					}
					return submq;
				}
			});
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					MessageExt msgExt = (MessageExt) msgs.get(0);
					String keys = msgExt.getKeys();
					String body = new String(msgExt.getBody());
					if (log.isInfoEnabled()) {
						log.info("[MessageListener]server listened keys..." + keys + "...msgExt...."
								+ msgExt.toString());
						log.info("[MessageListener]server listened keys..." + keys + "....body....." + body);
					}
					String responseJson = null;
					JsonOutput outputJson = null;
					JsonInput inputJson = JsonUtil.jsonStrToObject(body, JsonInput.class);
					try {
						ManagerCenterMQService mcService = (ManagerCenterMQService) SpringContextHolder
								.getBean("managerCenterMQService");
						responseJson = mcService.execute(body);
						if (log.isInfoEnabled()) {
							log.info("[MessageListener]server execute business method success... keys......" + keys
									+ "...responseJson..." + responseJson);
						}
						outputJson = JsonUtil.jsonStrToObject(responseJson, JsonOutput.class);
					} catch (Exception e) {
						outputJson = new JsonOutput(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
								inputJson.getTranSeq(), inputJson.getTranId(), Code.FAIL_CODE, Code.CODE_0001,
								CodeCenter.getInfo(Code.CODE_0001), inputJson.getSignatureInfo(),
								new ArrayList<Map<String, Object>>());
						if (log.isErrorEnabled()) {
							log.error("[MessageListener]server execute business method error......keys..." + keys
									+ "...errorMessage..." + e.getMessage());
						}
						e.printStackTrace();
					}
					transmitMessage(inputJson.getQueueId(), keys, outputJson);
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
		} catch (Exception e) {
			e.printStackTrace();
			if (log.isErrorEnabled()) {
				log.error("[MessageListener]server registerMessageListener error...errorMessage..." + e.getMessage());
			}
		}
	}

	public void transmitMessage(int queueId, String keys, JsonOutput outputJson) {
		MqUtil.sendMessage(queueId, keys, outputJson);
	}
}
package com.inspur.tax.common.rocketmq.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.inspur.tax.common.rocketmq.factory.Producer;
import com.inspur.tax.utils.json.JsonOutput;
import com.inspur.tax.utils.json.JsonUtil;
/**
 * Title: MqUtil.java
 * Description: TODO
 * Date: 2017年3月2日下午4:13:33
 * Author: GongZhf
 * modify:GongZhf 2017年3月2日下午4:13:33 TODO
 */
public class MqUtil {
	private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
	public static Map<String,BlockingQueue<Object>> globalMap = new HashMap<String,BlockingQueue<Object>>();
	
	public static void sendMessage(int queueId, String keys, JsonOutput outputJson) {
		try {
			final DefaultMQProducer producer = Producer.getInstance();
			String requestJson = JsonUtil.objectToJsonStr(outputJson);
			Message msg = new Message("MQ_RESPONSE_TOPIC", "MQ_RESPONSE_TAG", keys, requestJson.getBytes());
			SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return mqs.get(Integer.parseInt(String.valueOf(arg)));// 发送给指定的队列
				}
			}, queueId);
			if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
				if(log.isInfoEnabled()){
					log.info("[MqUtil]server send success...key..." + keys + "...queueId..." + queueId);
				}
			}else{
				if(log.isErrorEnabled()){
					log.error("[MqUtil]server sendStatus not ok...key..." + keys + "...queueId..." + queueId + "...sendStatus..." + sendResult.getSendStatus());
				}
			}
		} catch (Exception e) {
			if(log.isErrorEnabled()){
				log.error("[MqUtil]server send error...key..." + keys + "...queueId..." + queueId + "...errorMessage..." + e.getMessage());
			}
			e.printStackTrace();
		}
	}
}

 

在服务端发送返回消息时需要注意queueID和keys应该和客户端一致。

相关标签: RocketMQ