欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ 发送消息的基本案例

程序员文章站 2022-07-14 23:42:35
...

RocketMQ 消息发送的基本样例

“一发、一存、一消费” 是消息中间件的本质,本文简单的记录来RocketMQ消息的基本样例,包含消息的发送(同步消息/异步消息/单向消息)、消息消费(负载均衡模式/广播模式)、顺序消息、延时消息、批量消息以及事务消息。

首先在测试项目中引入依赖

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>

一、基本样例

1、消息的发送

消息发送可以拆分为如下几个基本步骤:

 1. 创建消息生产者producer,并制定生产者组名
 2. 指定nameserver地址
 3. 启动producter
 4. 创建消息对象,指定topic、tag和消息体
 5. 发送消息
 6. 关闭生产者
a)发送同步消息

在重要的通知消息、SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。

public class BaseProducer {
  public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("base-group");
    //2.指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.启动生产者
    producer.start();

    //每隔一秒发送1条消息,循环发送100条
    for (int i = 0; i < 100; i++) {
      String msgBody = "This is a msg to MQ" + i;
      System.out.println("发送消息:"+msgBody);
      //4.创建消息对象 指定主题、标签、消息体
      Message message = new Message("base-topic", "base-tag", 
      msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //5.发送顺序消息
      SendResult result = producer.send(message);
      //获取发送结果
      System.err.println("发送响应:MsgId:【" + result.getMsgId() 
      	+ "】,发送状态:【" + result.getSendStatus()+"】");

      //线程休眠一秒再去发送下一条消息
      TimeUnit.SECONDS.sleep(1);
    }

    //6.关闭生产者
    producer.shutdown();
  }
}

启动执行main方法,控制台得到如下打印输出:
RocketMQ 发送消息的基本案例

b)发送异步消息

异步传输通常用于对时间敏感的业务场景中,如果发送端不能容忍长时间的等待broker的响应的情况下可以选择发送异步消息。
与发送同步消息基本上一致,只有在第五步消息发送环节,通过提供的public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException 方法来发送异步消息。在发送时创建一个SendCallback接口的匿名内部实现类,实现消息发送成功的回调方法onSuccess(SendResult sendResult),以及消息发送失败出现异常时的onException(Throwable throwable)方法。

public class AsyncProducer1 {
  public static void main(String[] args) throws Exception{
    //1.创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("async-group");
    //2.指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.启动生产者
    producer.start();
    //循环发送100条消息
    for (int i = 0; i < 100; i++) {
      String msgBody = "This is an async msg to MQ" + i;
      System.out.println("发送消息:"+msgBody);
      //4.创建消息对象 指定主题、标签、消息体
      Message message = new Message("async-topic", "async-tag", 
      msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //5.发送异步消息
      producer.send(message, new SendCallback() {
        /**
         * 发送成功的回调函数
         * @param sendResult
         */
        @Override
        public void onSuccess(SendResult sendResult) {
          System.out.println("发送结果:"+sendResult);
        }
        /**
         * 发送异常的回调函数
         * @param throwable
         */
        @Override
        public void onException(Throwable throwable) {
          System.out.println("发送异常:"+throwable);
        }
      });
    }
    //6.关闭生产者
    producer.shutdown();
  }
}

启动执行main方法,控制台得到如下打印输出:
RocketMQ 发送消息的基本案例

c)单向发送消息

单向消息一般用于不是特别关心发送结果的场景,例如发送日志。
发送单向消息,只是在基本单顺序消息发送案例的基础上使用public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException 方法对消息进行发送。

public class OnewayProducer1 {
  public static void main(String[] args) throws Exception{
    //1.创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("one-way-group");
    //2.指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.启动生产者
    producer.start();

    //循环发送10条消息
    for (int i = 0; i < 10; i++) {
      String msgBody = "This is an one way msg to MQ" + i;
      System.out.println("发送消息:"+msgBody);
      //4.创建消息对象 指定主题、标签、消息体
      Message message = new Message("one-way-topic", "one-way-tag", 
      msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //5.发送单向消息
      producer.sendOneway(message);
    }

    //6.关闭生产者
    producer.shutdown();
  }
}

2、消费消息

消费消息可以分为如下几个步骤:

 1. 创建消费者consumer,制定消费者组名
 2. 指定nameserver地址
 3. 订阅主题topic和tag
 4. 设置回调函数,处理消息
 5. 启动消费者consumer

消费者DefaultMQPushConsumer提供了public void setMessageModel(MessageModel messageModel)方法来设置消费的模式。

MessageModel.BROADCASTING   广播模式
MessageModel.CLUSTERING  负载均衡模式(默认)
a)负载均衡模式(默认)

负载均衡模式指的是:一个消费者组内所有对消费者,共同承担消息的消费,每一个消费者消费的消息都是不同的。

public class BaseConsumer {
  public static void main(String[] args) throws Exception{
    //1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");
    //2.指定nameserver地址
    consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.订阅主题topic和tag
    consumer.subscribe("base-topic", "*");
    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, 
      ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
          for (MessageExt messageExt : list) {
            System.err.println("消费消息: " + new String(messageExt.getBody()));
          }
        } catch (Exception e) {
          e.printStackTrace();
          return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
      }
    });

    //5.启动消费者
    consumer.start();
  }
}
c)广播模式

广播模式是指组内的所有消费者都会消费全量的消息,在在消费者启动前,设置消费模式为MessageModel.BROADCASTING即可。

public class BaseConsumer {
  public static void main(String[] args) throws Exception{
    //1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");
    //2.指定nameserver地址
    consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.订阅主题topic和tag
    consumer.subscribe("base-topic", "*");

    //设定消费模式 【负载均衡模式(默认)/广播模式MessageModel.BROADCASTING】
    consumer.setMessageModel(MessageModel.BROADCASTING);

    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
       ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
          for (MessageExt messageExt : list) {
            System.err.println("消费消息: " + new String(messageExt.getBody()));
          }
        } catch (Exception e) {
          e.printStackTrace();
          return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
      }
    });

    //5.启动消费者
    consumer.start();
  }
}

二、顺序消息

消息中间件在数据结构上是一种先进先出的数据结构,消息的生产者将消息发送到broker中,存储在队列中,但实际上在一个broker中队列的数量往往大于一个,例如下图即为控制台中展示的broker中拥有四个队列。
RocketMQ 发送消息的基本案例
可以得出,当有消息发送到broker中到时候,在默认的情况下消息会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)。而消费者来消费消息时,是从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的,如何保证消息当顺序消费,是用户在使用时需要结合业务场景来考虑的问题之一。

果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上拉去消息,就保证了顺序。
 - 将某些需要保持顺序的局部消息放到broker的同一个队列中。
 - 用同一个线程保证其处理broker中的一个队列的消息。

订单模型类

public class OrderStep {

  private Long orderId;
  private String desc;

  public OrderStep(Long orderId, String desc) {
    this.orderId = orderId;
    this.desc = desc;
  }

  public OrderStep() {
  }

  @Override
  public String toString() {
    return "OrderStep{" +
      "orderId=" + orderId +
      ", desc='" + desc + '\'' +
      '}';
  }

  public static List<OrderStep> buildOrders(){
    List<OrderStep> orderlist = new ArrayList<>();
    Long[] ids = {1001l,1002l,1003l,1004l};
    for(Long id : ids){
      OrderStep orderStep = new OrderStep(id,"创建");
      orderlist.add(orderStep);
      OrderStep orderStep = new OrderStep(id,"付款");
      orderlist.add(orderStep);
      OrderStep orderStep = new OrderStep(id,"发货");
      orderlist.add(orderStep);
       OrderStep orderStep = new OrderStep(id,"关闭");
      orderlist.add(orderStep);
    }
    return orderlist;

  }
}

顺序消息的生产者
消息生产者通过MessageQueueSelector 分区队列选择器,通过订单ID以及自定义的规则,对消息进行分区队列对选择。

public class OrderProducer {
  public static void main(String[] args) throws Exception{
    //创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("order-group");
    //指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //启动生产者
    producer.start();
    //构建消息集合
    List<OrderStep> orderSteps = OrderStep.buildOrders();
    //发送消息
    int i = 0 ;
    for(OrderStep orderStep : orderSteps){
      String body = JSON.toJSONString(orderStep);
      Message msg = new Message("OrderTopic","order",""+(i++),body.getBytes());
      /**
       * 参数一:消息对象
       * 参数二:消息队列的选择器
       * 参数三:选择队列的业务表识(订单ID)
       */
      SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        /**
         *
         * @param list 队列集合
         * @param message 消息对象
         * @param o 业务标示的参数
         * @return
         */
        @Override
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
          Long orderId = (long) o;
          long index = orderId % list.size(); //订单ID除分区队列总数取模 
          System.out.println("队列总数【"+list.size()
          +"】,当前订单ID【"+orderId+"】选择的队列下标【"+index+"】");
          return list.get((int) index);
        }
      }, orderStep.getOrderId());
      System.out.println("发送结果:" + sendResult);
    }
    producer.shutdown();
  }
}

顺序消息的消费者
顺序消息的消费者 通过MessageListenerOrderly ,让一个队列的消息用一个线程去处理。

public class OrderConsumer {
  public static void main(String[] args) throws Exception{
    //1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-group");
    //2.指定nameserver地址
    consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.订阅主题topic和tag
    consumer.subscribe("OrderTopic", "*");
    //4.注册消息监听器  MessageListenerOrderly 一个队列的消息用一个线程去处理
    consumer.registerMessageListener(new MessageListenerOrderly() {
      /**
       *
       * @param  msgs 获取到的消息
       * @param consumeOrderlyContext
       * @return
       */
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
       ConsumeOrderlyContext consumeOrderlyContext) {

        for(MessageExt msg : msgs){
          String str = new String(msg.getBody());
          System.out.println("线程名称【"+Thread.currentThread().getName()
          +"】消费消息:" + str);
        }
        return ConsumeOrderlyStatus.SUCCESS;
      }
    });

    //5.启动消费者
    consumer.start();
  }
}

三、延时消息

当发送当消息不需要立即消费,需要设置一定当延迟时间时,可以悬着发送延迟消息,该类型当消息并不会立即被消费,会延迟一段设定好的时间后才会被消费者消费。

 RocketMq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
   “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”

通过Message类中提供的 public void setDelayTimeLevel(int level)方法来设置延迟的级别。

延迟消息的生产者:

public class DelayProducer {
  public static void main(String[] args) throws Exception{
    //创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("order-group");
    //指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //启动生产者
    producer.start();
    //构建消息集合
    List<OrderStep> orderSteps = OrderStep.buildOrders();
    //发送消息
    int i = 0 ;
    for(OrderStep orderStep : orderSteps){
      String body = JSON.toJSONString(orderStep);
      Message msg = new Message("DelayTopic","d1",""+(i++),body.getBytes());
      msg.setDelayTimeLevel(2); //设置延迟时间级别
      /**
       * 参数一:消息对象
       * 参数二:消息队列的选择器
       * 参数三:选择队列的业务表识(订单ID)
       */
      SendResult sendResult = producer.send(msg);
      System.out.println("发送结果:" + sendResult);
    }
    producer.shutdown();
  }
}

四、批量消息

当消息数量很多当时候,可以通过批量发送当形式来提高消息的发送效率。批量发送消息能够显著提高传递小消息的性能。

   限制:
   - 这批消息应该拥有相同的topic,相同分waitStoremsgOK,而且不能是延迟消息。
   - 批消息的总大小不应该超过4MB

批量消息的生产者:

public class BatchProducer {
  public static void main(String[] args) throws Exception{
    //1.创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("batch-group");
    //2.指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.启动生产者
    producer.start();
    //4.构建一个消息集合
    List<Message> msgs = new ArrayList<>();

    for(int i = 1 ; i<=10 ; i++){
      Message message = new Message("batch-topic", "batch-tag", 
      ("Hello World" + i ).getBytes());
      msgs.add(message);
    }

    //5.发送批量消息
    SendResult send = producer.send(msgs);
    System.out.println("批量发送结果:" + send);
    
    //6.关闭生产者
    producer.shutdown();
  }
}

五、事务消息

RocketMQ提供事务消息的发送,主要是为了解决本地事务执行与消息发送的原子性问题。即解决Producer执行业务逻辑成功之后投递消息可能失败的场景。
RocketMQ 发送消息的基本案例
事务消息的状态
1.提交状态 :允许消费者消费此消息
2.回归状态 :消息将被删除,不允许被消费
3.中间状态 :代表需要检查消息队列来确定状态
事务消息发送及提交
1.发送消息(half消息)
2.服务端响应写入结果
3.根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
4.根据本地事务状态执行commit或者rollback (commit操作生成消息索引,消息对消费者可见)
事务补偿
1.对没有commit/rolback对事务消息(pending状态对消息)从服务端发起一次回查
2.Producer收到回查消息,检查回查消息对应对的本地事务的状态
3.根据本地事务状态,重新commit汇总rollback,其中,补偿阶段用于解决消息commiy或者rollback发生超时或者失败的情况。
使用的限制
1.事务消息不支持延迟消息和批量消息
2.未来避免单个消息被检查太多次而导致半队列消息积累,可以设置默认单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
3.事务消息将在broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
4.事务性消息可能不止一次被检查或消费。
5.提交给用户的目标主题消费可能会失败,目前这依据日志的记录而定,它的高可用性机制来保证,如果希望确保事务消费不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6.事务消息的生产者ID不能与其他类型消费的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。

事务消息的生产者

public class Producer {
  public static void main(String[] args) throws Exception{
    //1.创建事务消息生产者producer,并制定生产者的组名
    TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
    //2.指定NameServer地址
    producer.setNamesrvAddr("39.108.128.240:9876");
    //3.设置事务监听器
    producer.setTransactionListener(new MyTransactionListener());
    //4.启动生产者
    producer.start();
    //每隔一秒,循环发送5条消息
    for (int i = 0; i < 4; i++) {
      String body = "事务消息【"+i+"】";
      //创建消息对象 指定主题、标签、消息体  同步和异步是一致到
      Message message = new Message("transaction-topic", "T-"+i, body.getBytes());
      //同步发送消息
      SendResult sendResult = producer.sendMessageInTransaction(message,null);
      //打印消息发送的结果
      System.out.println("发送结果:"+sendResult);
      //线程休眠一秒再去发送下一条消息
      TimeUnit.SECONDS.sleep(1);
    }
    //关闭生产者 不建议在事务消息中关闭,可能回导致事务监听器不执行
    //producer.shutdown();

  }
}

自定义的事务监听器

public class MyTransactionListener implements TransactionListener {
  //在该方法中执行本地的一些事务
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    if(message.getTags().equals("T-1")){
      System.out.println("tag【"+message.getTags()+"】-- 事务提交");
      return LocalTransactionState.COMMIT_MESSAGE;
    }else if(message.getTags().equals("T-2")){
      System.out.println("tag【"+message.getTags()+"】-- 事务回滚");
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }else {
      System.out.println("tag【"+message.getTags()+"】-- 事务结果未知");
      return LocalTransactionState.UNKNOW;
    }
  }
  //MQ在没有接收到事务结果时,发送请求来二次核对 ,消息事务状态的回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    System.out.println("MQ开始事务状态的回查");
    if(messageExt.getTags().equals("T-3")){
      System.out.println("tag【"+messageExt.getTags()+"】-- 事务提交");
      return LocalTransactionState.COMMIT_MESSAGE;
    }else{
      System.out.println("tag【"+messageExt.getTags()+"】-- 仍然结果未知");
      return LocalTransactionState.UNKNOW;
    }
  }
}

事务消息的消费者

public class Consumer {
  public static void main(String[] args) throws Exception{
    //1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-group");
    //2.指定nameserver地址
    consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.订阅主题topic和tag
    consumer.subscribe("transaction-topic", "*");
    //4.设置回调函数 处理消息
    consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
      try {
        for (MessageExt messageExt : list) {
          System.err.println("消费消息: " + new String(messageExt.getBody()));//输出消息内容
        }
      } catch (Exception e) {
        e.printStackTrace();
        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
    });
    //5.启动消费者
    consumer.start();
    //6.打印启动状态
    System.out.println("消费者启动成功");
  }
}

六、消费消息的过滤

针对消费者, 消费者来决定对哪些消息进行消费,对哪些消息不消费。
提供两种消息过滤的方式
1.依据tag来过滤
2.依据RocketMQ定义了一些基本语法来支持过滤 (只有使用push模式对消费者才能使用SQL92标准对sql语句)

数值比较 例如: > , >= ,< , <= , BETWEEN , = ;
字符比较 例如:= ,<> ,IN;
IS NULL 或者 IS NOT NULL ;
逻辑符号 AND , OR ,NOT;
常量支持类型:
数值 例如:123,3.1415926
字符 例如:‘abc’ 必须用单引号包裹起来
特殊字符 例如:NULL
布尔值 TRUE 或 FALSE

sql模式过滤的消息生产者
通过Message对象提供的 public void putUserProperty(String name, String value) 方法,添加自定义sql查询的标示与内容。

public class Producer {
  public static void main(String[] args) throws Exception{
    //创建消息生产者producer,并制定生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer("sql-filter-group");
    //指定NameServer地址
    producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //启动生产者
    producer.start();

    //每隔一秒,循环发送5条消息
    for (int i = 0; i < 5; i++) {

      String body = "通过tag过滤消息"+i;
      //创建消息对象 指定主题、标签、消息体  同步和异步是一致到
      Message message = new Message("sql-filter-topic", "sql-filter-tag", body.getBytes());
      //设置消息 用户自定义属性
      message.putUserProperty("i",String.valueOf(i));

      //同步发送消息
      SendResult sendResult = producer.send(message);
      //打印消息发送的结果
      System.out.println("发送结果:"+sendResult);
      //线程休眠一秒再去发送下一条消息
      TimeUnit.SECONDS.sleep(1);
    }

  }
}

sql模式过滤的消息消费者

public class Consumer {
  public static void main(String[] args) throws Exception{
    //1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql-filter-group");
    //2.指定nameserver地址
    consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开
    //3.订阅主题topic和tag  依据消息sql过滤器来过滤消息
    consumer.subscribe("sql-filter-topic", MessageSelector.bySql("i>=0"));
    //4.设置回调函数 处理消息
    consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
      try {
        for (MessageExt messageExt : list) {

          System.err.println("消费消息:【 " + new String(messageExt.getBody())+"】");
        }
      } catch (Exception e) {
        e.printStackTrace();
        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
    });
    
    //5.启动消费者
    consumer.start();
    //6.打印启动状态
    System.out.println("消费者启动成功");
  }
}

参考学习视频 黑马程序员《RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件》,图片来源于网络侵删~

相关标签: java