RocktMQ快速入门(一)
程序员文章站
2022-03-23 12:55:37
...
RocktMQ快速入门(一)
下载RocketMQ
-
下载地址http://rocketmq.apache.org/
unzip rocketmq-all-4.3.0-source-release.zip
cd rocketmq-all-4.3.0/
mvn -Prelease-all -DskipTests clean install -U
启动Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动Broker
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log
注意 按照官网启动方法 可能会出错,nohup sh bin/mqbroker -n localhost:9876 &
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest
关闭RocketMQ
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
安装控制台
地址https://github.com/apache/rocketmq-externals/tree/master
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd target
nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 &
生产者&消费者demo
- 新增maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
同步消息
/**
* 生产者:同步消息
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//初始化生产者组名.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// 设置nameServer地址.
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送消息.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//关闭生产者.
producer.shutdown();
}
}
异步消息
/**
* 生产者:异步消息
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
producer.shutdown();
}
}
单通道消息
/**
* 生产者:单通道消息
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.sendOneway(msg);
}
producer.shutdown();
}
}
消费者
/**
* 消费者
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
注意:本文代码来自官方网站