欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot集成RocketMQ

程序员文章站 2022-06-18 09:37:52
实战,用案例来说话 前面已经说了JMS和RocketMQ一些概念和安装,下面使用SpringBoot来亲身操作一下. 生产者的操作 1. SpringBoot项目创建完成,引入依赖是第一步: 2. 创建生产者是第二步,生产者必须依赖于生产组,而且需要指定nameServer 3. 创建Control ......

实战,用案例来说话

前面已经说了jms和rocketmq一些概念和安装,下面使用springboot来亲身操作一下.

生产者的操作

  1. springboot项目创建完成,引入依赖是第一步:
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-web</artifactid>
</dependency>

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-test</artifactid>
    <scope>test</scope>
</dependency>

<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>    
    <version>4.3.0</version>
</dependency>
  1. 创建生产者是第二步,生产者必须依赖于生产组,而且需要指定nameserver
@component
public class payproducer {

    /**
     * 生产组,生产者必须在生产组内
     */
    private string producergroup = "pay_group";

    /**
     * 端口
     */
    private string nameserver = "39.106.214.179:9876";


    private defaultmqproducer producer;

    public payproducer() {
        producer = new defaultmqproducer(producergroup);
        // 指定nameserver地址,多个地址之间以 ; 隔开
        producer.setnamesrvaddr(nameserver);
        start();
    }

    public defaultmqproducer getproducer() {
        return producer;
    }

    /**
     * 对象在使用之前必须调用一次,并且只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (mqclientexception e) {
            e.printstacktrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        producer.shutdown();
    }

}
  1. 创建controller进行测试发送消息,必须要指定topic,消息依赖于主题
@restcontroller
public class paycontroller {

    @autowired
    private payproducer payproducer;

    /**
     * topic,消息依赖于topic
     */
    private static final string topic = "pay_test_topic";


    @requestmapping("/api/v1/pay_cb")
    public object callback(string text) throws interruptedexception, remotingexception, mqclientexception, mqbrokerexception {
        // 创建消息  主题   二级分类   消息内容好的字节数组
        message message = new message(topic, "taga", ("hello rocketmq " + text).getbytes());

        sendresult send = payproducer.getproducer().send(message);

        system.out.println(send);

        return new hashmap<>();
    }

}
  1. 采坑记录
    • 上面完成就可以启动项目了,访问之后报错了:
    mqclientexception: no route info of this topic, topictest1
    这个的原因就是broker禁止自动创建topic且用户没有通过手动方式创建topic, 或者是broker与nameserver网络不通
    解决:
        使用手动创建topic,在rocketmq控制台的主题中创建就好,最主要的是指定topic name,如下图
            出现创建不了的情况往下看
    
        如果还出现这个问题,请关闭防火墙

    SpringBoot集成RocketMQ

    • 这次说下上面可能创建不了的问题,前面说了安装开放安全组,这次就是因为rocketmq虚拟的端口问题,需要开放10909,也就是说ecs最终开放的端口号: 8080,10911,9876,10909

    • 继续采坑
    org.apache.rocketmq.remoting.exception.remotingtoomuchrequestexception: senddefaultimpl call timeout
    
    这个问题是阿里云服务器存在多个网卡,rocketmq会根据当前网卡选择一个ip使用,我们需要制定一个ip:
        路径是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq
    
        vim ./conf/broker.conf
    
        添加配置: brokerip1=公网ip
    
        重新启动: 
            nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
            tail -f nohup.out
    • 其他问题
        https://blog.csdn.net/qq_14853889/article/details/81053145
        https://blog.csdn.net/wangmx1993328/article/details/81588217#%e5%bc%82%e5%b8%b8%e8%af%b4%e6%98%8e
        https://www.jianshu.com/p/bfd6d849f156
        https://blog.csdn.net/wangmx1993328/article/details/81588217

消费者操作

  1. 在前一个项目的基础上,将公共内容提取出来,创建一个jsmconfig的类,来声明公共内容:
    ```
    public class jmsconfig {

     /**
      * 端口
      */
     public static final string name_server = "39.106.214.179:9876";
    
     /**
      * topic,消息依赖于topic
      */
     public static final string topic = "pay_test_topic";
    }
    ```
  2. 生产者内容变为
    ```
    @component
    public class payproducer {

     /**
      * 生产组,生产者必须在生产组内
      */
     private string producergroup = "pay_producer_group";
    
     private defaultmqproducer producer;
    
     public payproducer() {
         producer = new defaultmqproducer(producergroup);
         // 指定nameserver地址,多个地址之间以 ; 隔开
         producer.setnamesrvaddr(jmsconfig.name_server);
         start();
     }
    
     public defaultmqproducer getproducer() {
         return producer;
     }
    
     /**
      * 对象在使用之前必须调用一次,并且只能初始化一次
      */
     public void start() {
         try {
             this.producer.start();
         } catch (mqclientexception e) {
             e.printstacktrace();
         }
     }
    
     /**
      * 一般在应用上下文,使用上下文监听器,进行关闭
      */
     public void shutdown() {
         producer.shutdown();
     }
    }
    ```
  3. 创建消费者
    ```
    @component
    public class payconsumer {

     private defaultmqpushconsumer consumer;
    
     private string consumergroup = "pay_consumer_group";
    
     public payconsumer() throws mqclientexception {
         consumer = new defaultmqpushconsumer(consumergroup);
         consumer.setnamesrvaddr(jmsconfig.name_server);
         // 设置消费地点,从最后一个进行消费(其实就是消费策略)
         consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset);
         // 订阅主题的哪些标签
         consumer.subscribe(jmsconfig.topic, "*");
         // 注册监听器
         consumer.registermessagelistener((messagelistenerconcurrently)
                 (msgs, context) -> {
                     try {
                         // 获取message
                         message msg = msgs.get(0);
                         system.out.printf("%s receive new messages: %s %n",
                                 thread.currentthread().getname(), new string(msgs.get(0).getbody()));
                         string topic = msg.gettopic();
                         string body = new string(msg.getbody(), "utf-8");
                         // 标签
                         string tags = msg.gettags();
                         string keys = msg.getkeys();
                         system.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
                         return consumeconcurrentlystatus.consume_success;
                     } catch (unsupportedencodingexception e) {
                         e.printstacktrace();
                         return consumeconcurrentlystatus.reconsume_later;
                     }
                 });
         consumer.start();
         system.out.println("consumer listener");
     }

    }
    ```

  4. controller的变化:
    ```
    @restcontroller
    public class paycontroller {

     @autowired
     private payproducer payproducer;
    
    
    
     @requestmapping("/api/v1/pay_cb")
     public object callback(string text) throws interruptedexception, remotingexception, mqclientexception, mqbrokerexception {
         // 创建消息  主题   二级分类   消息内容好的字节数组
         message message = new message(jmsconfig.topic, "taga", ("hello rocketmq " + text).getbytes());
    
         sendresult send = payproducer.getproducer().send(message);
    
         system.out.println(send);
    
         return new hashmap<>();
     }

    }
    ```

梳理一下整个流程,生产者存在于生产组,所以生产组很重要,创建生产者需要指定生产组.消费者同理,创建消费者也需要指定消费组. 并且二者都需要指定nameserver. 有了生产者就要发送消息,也就是message,创建message需要指定topic,二级分类和消息体等信息. 那消费者如何获取呢? 无非就是绑定topic和二级分类就可以,这就是整个流程. 中间少说了消息的存放,消息是在broker中,这个相当于仓库,所以就是生产者生产消息到broker,consumer从broker中获取消息进行消费.