windows搭建RocketMQ服务
一.系统及环境
windows7,JDK1.8,IntelliJ IDEA
二.RocketMQ的安装
1.下载RocketMQ的版本:
下载地址http://rocketmq.apache.org/release_notes/release-notes-4.2.0/
我下载的事这个
下载好的文件: rocketmq-all-4.2.0-bin-release.zip
2.然后将下载好的文件解压
解压后的文件目录如下:
3.修改配置
3.1修改rocketmq配置:网上有人建议修改(理由是有可能会因为内存设置过大导致一系列的问题),具体细节可以下去了解和学习,这里我也修改了
需要执行的命令都在程序目录下的bin文件夹下,主要是mqnamesrv.cmd
和mqbroker.cmd
这两个命令,但他们又都各自调用了runserver.cmd
和runbroker.cmd
命令,分别修改这两个文件里的JAVA_OPT设置,如下:
将
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改为
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.2修改环境变量(很重要)
步骤如下:
添加环境变量值ROCKETMQ_HOME,变量值为mq的解压的路径
环境变量添加后,一般需要重启计算机(我的没有重启,mq一直启动不了,重启之后成功启动)
三.启动mq
1.启动 NameServer:
在cmd下进去mq的解压的bin路径,执行mqnamesrv.cmd -n localhost:9876命令
会弹出另外一个命令窗口,内容如下,说明NameServer启动成功(该窗口不要关闭)
如果执行命令行时,提示你设置环境变量,那么启动失败,原因是环境变量没有配好,请参照上文配置,重新配置
2.启动 Broker
在bin路径下执行命令start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true876 autoCreateTopicEnable=true
说明:官方文档,还有网上其他人,使用mqbroker.cmd -n localhost:9876命令启动Broker,但我试过,启动不了,在网上参照了一些大牛,使用了上面的方式(参考https://blog.csdn.net/yangding_/article/details/62419383)
执行成功后会弹出一个空白的cmd命令窗口,如下
目前我的状态时启动的(不要关闭这个窗口)
四.使用demo程序,要调试一下,消息的消费(参考https://www.cnblogs.com/lsx1993/p/4656351.html)
下面是我自己按照上面写的demo和调试的
1.maven工程目录结构
1.1pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.joe</groupId>
<artifactId>rocketMQtest</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rocketmqTest Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.2.6</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
<build>
<finalName>rocketmqTest</finalName>
</build>
</project>
1.2.消费者
package com.joe.example;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException{
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("Consumber");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1","TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt>msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+" Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if(msg.getTopic().equals("TopicTest1")) {
//执行TopicTest1的消费逻辑
if(msg.getTags() != null && msg.getTags().equals("TagA")) {
//执行TagA的消费
System.out.println(new String(msg.getBody()));
}else if (msg.getTags() != null
&&msg.getTags().equals("TagC")) {
//执行TagC的消费
System.out.println(new String(msg.getBody()));
}else if (msg.getTags() != null
&&msg.getTags().equals("TagD")) {
//执行TagD的消费
System.out.println(new String(msg.getBody()));
}
}else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("ConsumerStarted.");
}
}
1.3生产者
package com.joe.example;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws MQClientException,
InterruptedException{
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("Producer");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for (int i = 0; i < 10; i++){
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQA").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQB").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQC").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}catch(Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
//producer.shutdown();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
}
}
1.4,执行
先执行消费者,在执行生产者
控制台日志如下
上一篇: Verilog模块实例化
推荐阅读
-
php 页面跳转 本土windows 环境正常,上传linux 服务器上跳转不了
-
最简单的nginx+ftp搭建图片服务器(Windows Server服务器环境下和本机都可以用)
-
如何Windows系统中搭建php环境
-
【MongoDB】windows平台搭建Mongo数据库复制集(类似集群)(三
-
CentOS 6(64-bit) + Nginx搭建静态文件服务器
-
三、记一次失败的 CAS 搭建 之 服务端配置_PHP教程
-
服务器搭建2
-
Windows Server 2016-MS服务器应用程序兼容性列表
-
windows环境开发yii上传程序至linux服务器时程序运行结果不一致的问题 windows最新版 windows10安装程序 windows会员
-
【MongoDB】windows平台搭建Mongo数据库复制集(类似集群)(二