欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocktMQ快速入门(一)

程序员文章站 2022-03-23 12:55:37
...

RocktMQ快速入门(一)

下载RocketMQ

  • 下载地址http://rocketmq.apache.org/

    unzip rocketmq-all-4.3.0-source-release.zip
    cd rocketmq-all-4.3.0/
    mvn -Prelease-all -DskipTests clean install -U

启动Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocktMQ快速入门(一)

启动Broker

nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log

RocktMQ快速入门(一)

注意 按照官网启动方法 可能会出错,nohup sh bin/mqbroker -n localhost:9876 &
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest

关闭RocketMQ

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

安装控制台

生产者&消费者demo

  • 新增maven依赖
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
    </dependency>

同步消息

/**
 * 生产者:同步消息
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者组名.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // 设置nameServer地址.
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //发送消息.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //关闭生产者.
        producer.shutdown();
    }
}

异步消息

/**
 * 生产者:异步消息
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
            final int index = i;
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        producer.shutdown();
    }
}

单通道消息

/**
 * 生产者:单通道消息
 */
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            producer.sendOneway(msg);

        }
        producer.shutdown();
    }
}

消费者

/**
 * 消费者
 */
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

注意:本文代码来自官方网站

相关标签: RocketMQ