RocketMq——windows环境搭建与入门
一、准备工作
文件已上传百度云网盘
运行当前最新版本的RockitMQ(v4.4.0),必须先安装64bit的JDK1.8或以上版本。
从RockitMQ官网 http://rocketmq.apache.org/release_notes/
下载最新的release包。
上图是rocketmq-all-4.4.0-bin-release.zip
包解压后的目录结构。bin
目录下存放可运行的脚本。
二、RocketMQ基本结构
在动手开发之前,我们需要了解一下RocketMQ的基本结构。
RocketMQ基本结构
如上图所示,一个正常工作的RocketMQ包括四个部分。
- NameServer :基于高可用设计产生的,用于服务发现和路由。正式应用时通常采用集群部署。
- Broker:实现队列机制,负责消息存储和转发。正式应用时也采用集群部署。
- Producer:消息生产者,生成消息并发送到RocketMQ中,生产者通常是我们自己实现的应用程序。
- Consumer:消息消费者,从RocketMQ中接收消息并进行业务处理。这部分也通常是我们自己实现的。
三、Windows环境下启动最小应用
从上面的图可以了解到,RocketMQ自身分为 NameServer 和 Broker 两个部分,因此,用作本机开发调试用的最小应用,应该分别启动一个NameServer和一个Broker节点。
RocketMQ默认提供了 windows环境 和 linux环境 下的启动脚本。脚本位于bin
目录下,windows的脚本以.cmd
为文件名后缀,linux环境的脚本以.sh
为文件名后缀。
不过,通常情况下,windows下的脚本双击启动时,都是窗口一闪而过,启动失败。下面的内容就帮大家解决这些问题。
第一步,配置 JAVA_HOME 和 ROCKETMQ_HOME 环境变量
JAVA_HOME 的配置已经是老生常谈,这里不再赘述,不懂的话请自行百度。
ROCKETMQ_HOME 应指向解压后的Readme.md
文件所在目录。
如上面的第一张图,我的 ROCKETMQ_HOME 应配置为
D:\programs\rocketmq\rocketmq-all-4.4.0-bin-release
第二步,启动 NameServer
NameServer的启动脚本是bin
目录下的mqnamesrv.cmd
。
上文讲过,即使配置好了ROCKETMQ_HOME环境变量,mqnamesrv.cmd
的启动通常也以失败告终。
阅读mqnamesrv.cmd
脚本,发现其实际上是调用了runserver.cmd
脚本来实现启动的动作。
而在runserver.cmd
脚本,java的默认启动参数中,启动时堆内存的大小为2g,老旧一点的机器上根本没有这么多空闲内存。
因此,用编辑器修改一下runserver.cmd
脚本。将原来的内存参数注释掉(cmd脚本使用rem关键字),修改为:
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
直接双击mqnamesrv.cmd
脚本启动NameServer。
NameServer启动显示
看到 The Name Server boot success
字样,表示NameServer己启动成功。
windows环境下,可以在目录
%USERPROFILE%\logs\rocketmqlogs
下找到NameServer的启动日志。文件名为namesrv.log
。
第三步,启动 Broker
Broker的启动脚本是mqbroker.cmd
。
与mqnamesrv.cmd
脚本类似,mqbroker.cmd
是调用runbroker.cmd
脚本启动Broker的。
同样的,优化一下runbroker.cmd
的启动内存
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
此外,Broker脚本启动之前要指定 NameServer的地址。
NameServer默认启动端口是9876,这点可以从NameServer的启动日志中找到记录。
因此,还需要修改mqbroker.cmd
脚本,增加NameServer的地址。
rem 添加此行,指定NameServer的地址
set "NAMESRV_ADDR=localhost:9876"
rem 在此行之前添加NameServer的地址
call "%ROCKETMQ_HOME%\bin\runbroker.cmd" org.apache.rocketmq.broker.BrokerStartup %*
双击mqbroker.cmd脚本启动Broker。
Broker启动成功
看到 The broker ... boot success
字样,表示Broker己启动成功。
与NameServer类似,可以在目录
%USERPROFILE%\logs\rocketmqlogs
下找到Broker的启动日志。文件名为broker.log
。
四、验证RocketMQ功能
RocketMQ自带了恬送与接收消息的脚本tools.cmd
,用来验证RocketMQ的功能是否正常。
tool.cmd脚本需要带参数执行,无法用简单的双击方式启动。因此,我们打开一个cmd窗口,并跳转到bin目录下。
打开cmd窗口并跳转到bin目录下
启动消费者
与mqbroker.cmd
脚本类似,启动tool.cmd
命令之前我们要指定NameServer地址。
这里我们采用命令方式指定,并启动消费者。依次执行如下命令:
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
启动消费者成功
启动生产者
再打开一个cmd窗口,依次执行如下命令:
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
生产者启动命令
启动成功后,生产者会发送1000个消息,然后自动退出。
生产者发送消息并退出
此时,在消费者界面按下Ctrl + C
,就会收到刚刚生产者发出的消息。
消费者接收消息
至此,RocketMQ最小应用己经可以正常工作,能满足我们开发环境下调试代码的需求。
这部分的内容做了几个简单的bat文件,为安全按照上面修改,但能完成该功能,网盘中能找到对应文件
五、java代码实现
生产者:
package com.suirui.mq.rocket;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @Author zongx
* @Date 2020/5/7 17:08
* @Version 1.0
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//以group名字创建一个producer
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
//防止connect to <ip:xxx> failed rocketmq默认开启了vip通道
producer.setVipChannelEnabled(false);
//为避免程序启动的时候报错,添加此代码,可以让rocketMq自动创建topickey
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//启动生产者
producer.start();
for(int i = 0; i < 10; i++){
try {
/*
* 1、topic
* Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。
* 通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。
* topic是单独使用命令进行创建的。此处使用默认的orderTopic
* 2、flag
* 网络通信层标记。(没确定用处)
* 3、body
* Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制
* 4、transactionId
* RocketMQ 4.3.0引入的事务消息相关的事务编号
* 5、properties
* 该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。
* RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。
* 当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。
* 6、tag
* 不同的消费组,订阅同一 topic 不同的 tag,拉取不同的消息并消费。在 topic 内部对消息进行隔离。
* */
Message message = new Message("TopicTest", "Tag1",
("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult send = producer.send(message);
System.out.println("发送的消息Id: " + send.getMsgId() +"-------发送消息的状态:" + send.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
}
}
消费者:
DefaultMQPushConsumer
使用 DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数 。 系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,而且加入新的 DefaultMQPushConsumer后会自动做负载均衡。
DefaultMQPushConsumer需要设置三 个 参数 : 一 是这个 Consumer 的 GroupName,二是 NameServer 的地址和端 口号,三是 Topic 的名称 ,下面将分 别进行详细介绍 。
(1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力, GroupName需要和消息模式 (MessageModel)配合使用。
RocketMQ支持两种消息模式: Clustering和Broadcasting。
在 Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息的一部分 内容, 同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
在 Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消 息会被多次分发,被 多个 Consumer消费。
(2)NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port” 。
(3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”), 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。
package com.suirui.mq.rocket;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author zongx
* @Date 2020/5/7 17:59
* @Version 1.0
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
/*
* DefaultMQPushConsumer需要设置三 个 参数 :
* 一 是这个 Consumer 的 GroupName.二是 NameServer 的地址和端 口号,三是 Topic 的名称,下面将分 别进行详细介绍 。
* (1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力,
* GroupName需要和消息模式 (MessageModel)配合使用。
* RocketMQ支持两种消息模式: Clustering和Broadcasting。
* 1、在 Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息的一部分 内容,
* 同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
* 2、在 Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,
* 也就是一个消息会被多次分发,被 多个 Consumer消费。
* (2)NameServer 的地址和端口 号,可以填写多个 ,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3 :port” 。
* (3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”),
* 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr("localhost:9876");
//设置消费者端消息拉取策略,表示从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe("TopicTest","*");
//消费者端启动消息监听,一旦生产者发消息被监听到,就打印消息。和rabbitmq的handlerDelivery类似
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
byte[] body = messageExt.getBody();
String msg = new String(body);
String msgId = messageExt.getMsgId();
System.out.println("*********************************");
System.out.println("消费响应:msgId : " + msgId + ", msgBody : " + msg + ", tag:" + tags + ", topic:" + topic);
System.out.println("*********************************");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
System.out.println("Consumer Started....");
}
}
DefaultMQPullConsumer
处理 逻辑是逐个 读 取某 Topic 下所有 Message Queue 的内容, 读完一遍后退出, 主要处理额外的三件事情:
( 1 )获取 Message Queue 并遍历
一 个 Topic 包括多个 Message Queue,如果这个 Consumer 需要获取 Topic 下所有的消息,就 要遍历多有的 Message Queue。 如果有特殊情况,也可以选 择某些特定的 Message Queue 来读取消息 。
( 2 )维护 Offsetstore
从一个 Message Queue 里拉取消息的时候,要传人 Offset参数( long类型 的值),随着不断读取消息 , Offset会不断增长 。 这个时候由用户负责把 Offset 存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等 。
( 3 )根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回: FOUND、 NO_MATCHED_MSG、 NO_NEW_MSG、 OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理 。比较重要的两个状态是 FOUNT 和 NO NEW MSG ,分别表示获取到消息和没 有新的消息 。
package com.suirui.mq.rocket;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @Author zongx
* @Date 2020/5/7 17:59
* @Version 1.0
*/
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("CID_LRW_DEV_SUBS");
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
上一篇: 1.Appium自动化测试-android:环境搭建
下一篇: Appium环境搭建
推荐阅读