rocket MQ消息队列
阿里云开发地址:https://www.aliyun.com
1.阿里云账号:springstudent2016
2.GitHub 账号:gaoweigang/298gaoweigang_20180123
注册GitHub使用的邮箱:aaa@qq.com
3.博客:http://www.aiuxian.com/article/p-1933708.html
http://blog.csdn.net/xiaojie19871116/article/details/46982907
http://blog.csdn.net/loongshawn/article/details/51086876
4.rocketmq命令:http://jameswxx.iteye.com/blog/2091971
5.linux命令大全:http://man.linuxde.net/sh
6.分布式消息队列RocketMQ部署与监控:https://my.oschina.net/boltwu/blog/472905
7.rocketmq 消息队列的顺序性问题:https://my.oschina.net/u/1589819/blog/787823
一:RocketMQ消息队列环境搭建
http://blog.csdn.net/loongshawn/article/details/51086876
注意:每次在启动Broker之前需要指定nameserver地址(或者将nameserver地址配置到环境变量之中),其中10.125.1.186为所在服务器IP,eg:export NAMESRV_ADDR=10.125.1.186:9876
二:测试RocketMQ消息队列
①创建Maven项目目录结构如下:
②pom文件依赖配置
③创建生产者
package com.alibaba.rocketmq.producer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
public class Producer {
//使用你的账号构建一个客户端实例来访问DefaultMQProducer
private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
private static int initialState = 0;
private Producer(){
}
public static DefaultMQProducer getDefaultMQProducer(){
if(producer == null){
producer = new DefaultMQProducer("ProducerGroupName");
}
if(initialState == 0){
producer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
try{
producer.start();
} catch(MQClientException e){
e.printStackTrace();
}
initialState = 1;
}
return producer;
}
}
④创建消费者
package com.alibaba.rocketmq.consumer;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
public class Consumer {
private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
private static int initialState = 0;
private Consumer(){
}
public static DefaultMQPushConsumer getDefaultMQPushConsumer(){
if(consumer == null){
consumer = new DefaultMQPushConsumer("ConsumerGroupName");
}
if(initialState == 0){
consumer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
initialState = 1;
}
return consumer;
}
}
⑤生产者生产消息
package com.alibaba.rocketmq.service;
import org.apache.log4j.Logger;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.producer.Producer;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
public class ProducerTtest {
private static final Logger LOGGER = Logger.getLogger(ProducerTtest.class);
public static void main(String[] args) {
sendMsg();
}
//生产者发送消息
public static void sendMsg(){
//获取消息生产者
DefaultMQProducer producer = Producer.getDefaultMQProducer();
for(int i = 0; i < 2000 ;i++){
Message msg = new Message("TopicTest1", //topic
"TagA", //tag
"OrderIDOO"+i, //key
("Hello MetaQ"+i).getBytes()); //body
SendResult sendResult;
try {
sendResult = producer.send(msg);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
⑥消费者消费消息
package com.alibaba.rocketmq.service;
import java.util.List;
import org.apache.log4j.Logger;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.consumer.Consumer;
public class ConsumerTest {
private static final Logger LOGGER = Logger.getLogger(ConsumerTest.class);
public static void main(String[] args) {
receiveMsg();
}
// 消费者接受消息
public static void receiveMsg() {
// 获取消息消费者
DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();
// 订阅主题
try {
consumer.subscribe("TopicTest1", "*");
consumer.setConsumerGroup("gaoweigang");//设置消费组
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumerMessageBatchMaxSize参数来批量接受消息
*/
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
LOGGER.info(Thread.currentThread().getName()+" , Receive new Messages: "+msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null
&& msg.getTags().equals("TagA")) {
// 执行TagA的消费
LOGGER.info(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagB")) {
// 执行TagB的消费
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
}
} else if (msg.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Consumer对象在使用之前必须要调用start
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
⑦执行ProducerTest,然后使用如下命令查看指定主题中的数据
RocketMQ命令:
用法: