欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocket MQ消息队列

程序员文章站 2022-05-18 10:05:02
...

阿里云开发地址:https://www.aliyun.com

1.阿里云账号:springstudent2016  

2.GitHub 账号:gaoweigang/298gaoweigang_20180123

  注册GitHub使用的邮箱:aaa@qq.com

3.博客:http://www.aiuxian.com/article/p-1933708.html

          http://blog.csdn.net/xiaojie19871116/article/details/46982907

          http://blog.csdn.net/loongshawn/article/details/51086876

4.rocketmq命令:http://jameswxx.iteye.com/blog/2091971

5.linux命令大全:http://man.linuxde.net/sh

6.分布式消息队列RocketMQ部署与监控:https://my.oschina.net/boltwu/blog/472905

7.rocketmq 消息队列的顺序性问题:https://my.oschina.net/u/1589819/blog/787823

一:RocketMQ消息队列环境搭建

http://blog.csdn.net/loongshawn/article/details/51086876

注意:每次在启动Broker之前需要指定nameserver地址(或者将nameserver地址配置到环境变量之中),其中10.125.1.186为所在服务器IP,eg:export NAMESRV_ADDR=10.125.1.186:9876

 

二:测试RocketMQ消息队列

①创建Maven项目目录结构如下:

rocket MQ消息队列

②pom文件依赖配置

rocket MQ消息队列
 

③创建生产者

package com.alibaba.rocketmq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class Producer {
	
	
	//使用你的账号构建一个客户端实例来访问DefaultMQProducer
	private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
	private static int initialState = 0;
	
	private Producer(){
		
	}
	
	public static DefaultMQProducer getDefaultMQProducer(){
		if(producer == null){
			producer = new DefaultMQProducer("ProducerGroupName");
		}
		if(initialState == 0){
			producer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
			try{
				producer.start();
			} catch(MQClientException e){
				e.printStackTrace();
			}
			initialState = 1;
		}
		
		return producer;
	}

}

 

④创建消费者

package com.alibaba.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {
	
	private static DefaultMQPushConsumer  consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    private static int initialState = 0;
    
    private Consumer(){
    	
    }
    
    public static DefaultMQPushConsumer  getDefaultMQPushConsumer(){
    	if(consumer == null){
    		consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    	}
    	
    	if(initialState == 0){
    		consumer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		initialState = 1;
    	}
    	return consumer;
    }
}

⑤生产者生产消息

package com.alibaba.rocketmq.service;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.producer.Producer;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class ProducerTtest {
	
	private static final Logger LOGGER = Logger.getLogger(ProducerTtest.class);
	
	public static void main(String[] args) {
		sendMsg();
	}
	
	//生产者发送消息
	public static void sendMsg(){
		//获取消息生产者
		DefaultMQProducer producer = Producer.getDefaultMQProducer();
		
		for(int i = 0; i < 2000 ;i++){
			Message msg = new Message("TopicTest1",   //topic
					                  "TagA",         //tag
					                  "OrderIDOO"+i,  //key
					                  ("Hello MetaQ"+i).getBytes()); //body
			
			SendResult sendResult;
			try {
				sendResult = producer.send(msg);
			} catch (MQClientException e) {
				e.printStackTrace();
			} catch (RemotingException e) {
				e.printStackTrace();
			} catch (MQBrokerException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		producer.shutdown();
	}

}

 

⑥消费者消费消息

package com.alibaba.rocketmq.service;

import java.util.List;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.consumer.Consumer;

public class ConsumerTest {

	private static final Logger LOGGER = Logger.getLogger(ConsumerTest.class);
	
	public static void main(String[] args) {
		receiveMsg();
	}

	// 消费者接受消息
	public static void receiveMsg() {
		// 获取消息消费者
		DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

		// 订阅主题
		try {
			consumer.subscribe("TopicTest1", "*");
			consumer.setConsumerGroup("gaoweigang");//设置消费组
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				/**
				 * 默认msgs里只有一条消息,可以通过设置consumerMessageBatchMaxSize参数来批量接受消息
				 */
				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {

					LOGGER.info(Thread.currentThread().getName()+" , Receive new Messages: "+msgs.size());
					MessageExt msg = msgs.get(0);

					if (msg.getTopic().equals("TopicTest1")) {
						// 执行TopicTest1的消费逻辑
						if (msg.getTags() != null
								&& msg.getTags().equals("TagA")) {
							// 执行TagA的消费
							LOGGER.info(new String(msg.getBody()));
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagB")) {
							// 执行TagB的消费
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagC")) {
							// 执行TagC的消费
						}
					} else if (msg.getTopic().equals("TopicTest2")) {
						// 执行TopicTest2的消费逻辑
					}

					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			// Consumer对象在使用之前必须要调用start
			consumer.start();

		} catch (MQClientException e) {

			e.printStackTrace();
		}
	}

}

⑦执行ProducerTest,然后使用如下命令查看指定主题中的数据

rocket MQ消息队列

RocketMQ命令:

rocket MQ消息队列
用法:

rocket MQ消息队列