欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

浅谈Springboot整合RocketMQ使用心得

程序员文章站 2023-12-11 16:04:52
一、阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc295...

一、阿里云官网---帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh

按照官网步骤,创建topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class mqconfig {
  /**
   * 启动测试之前请替换如下 xxx 为您的配置
   */
  public static final string public_topic = "test";//公网测试
  public static final string public_producer_id = "pid_scheduler";
  public static final string public_consumer_id = "cid_service";

  public static final string access_key = "123";
  public static final string secret_key = "123";
  public static final string tag = "";
  public static final string thread_num = "25";//消费端线程数
  /**
   * onsaddr 请根据不同region进行配置
   * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
   * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
   */
  public static final string onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

onsaddr 阿里云用 公有云生产,测试用公网

不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建topic

2、生产者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="utf-8"?>
<!doctype beans public "-//spring//dtd bean//en" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.producerbean"
     init-method="start" destroy-method="shutdown">
    <property name="properties">
      <map>
        <entry key="producerid" value="" /> <!-- pid,请替换 -->
        <entry key="accesskey" value="" /> <!-- access_key,请替换 -->
        <entry key="secretkey" value="" /> <!-- secret_key,请替换 -->
        <!--propertykeyconst.onsaddr 请根据不同region进行配置
         公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
        <entry key="onsaddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
      </map>
    </property>
  </bean>
</beans>

启动方式1,在使用类的全局里设置:

//初始化生产者
  private applicationcontext ctx;
  private producerbean producer;

  @value("${producerconfig.enabled}")//开关,spring配置项,true为开启,false关闭
  private boolean producerconfigenabled;

  @postconstruct
  public void init(){
    if (true == producerconfigenabled) {
      ctx = new classpathxmlapplicationcontext("producer.xml");
      producer = (producerbean) ctx.getbean("producer");
    }
  }

ps:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullgc,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown

方式2:配置类(不需要xml)

@configuration
public class producerbeanconfig {

  @value("${openservices.ons.producerbean.producerid}")
  private string producerid;

  @value("${openservices.ons.producerbean.accesskey}")
  private string accesskey;

  @value("${openservices.ons.producerbean.secretkey}")
  private string secretkey;

  private producerbean producerbean;

  @value("${openservices.ons.producerbean.onsaddr}")
  private string onsaddr;

  @bean
  public producerbean oneproducer() {
    producerbean producerbean = new producerbean();
    properties properties = new properties();
    properties.setproperty(propertykeyconst.producerid, producerid);
    properties.setproperty(propertykeyconst.accesskey, accesskey);
    properties.setproperty(propertykeyconst.secretkey, secretkey);
    properties.setproperty(propertykeyconst.onsaddr, onsaddr);

    producerbean.setproperties(properties);
    return producerbean;
  }
}

ps:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3

方式3:(不需要xml)

@component
public class producerbeansingleton {

  @value("${openservices.ons.producerbean.producerid}")
  private string producerid;

  @value("${openservices.ons.producerbean.accesskey}")
  private string accesskey;

  @value("${openservices.ons.producerbean.secretkey}")
  private string secretkey;

  @value("${openservices.ons.producerbean.onsaddr}")
  private string onsaddr;

  private static producer producer;

  private static class singletonholder {
    private static final producerbeansingleton instance = new producerbeansingleton();
  }

  private producerbeansingleton (){}

  public static final producerbeansingleton getinstance() {
    return singletonholder.instance;
  }

  @postconstruct
  public void init(){
    // producer 实例配置初始化
    properties properties = new properties();
    //您在控制台创建的producer id
    properties.setproperty(propertykeyconst.producerid, producerid);
    // accesskey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setproperty(propertykeyconst.accesskey, accesskey);
    // secretkey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setproperty(propertykeyconst.secretkey, secretkey);
    //设置发送超时时间,单位毫秒
    properties.setproperty(propertykeyconst.sendmsgtimeoutmillis, "3000");
    // 设置 tcp 接入域名(此处以公共云生产环境为例)
    properties.setproperty(propertykeyconst.onsaddr, onsaddr);
    producer = onsfactory.createproducer(properties);
    // 在发送消息前,必须调用start方法来启动producer,只需调用一次即可
    producer.start();
  }

  public producer getproducer(){
    return producer;
  }
}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialect

consumerconfig.enabled = true

producerconfig.enabled = true #方式1:

scheduling.enabled = false

#方式2、3:rocketmq \u516c\u7f51\u914d\u7f6e
openservices.ons.producerbean.producerid = pid
openservices.ons.producerbean.accesskey = 
openservices.ons.producerbean.secretkey = 

openservices.ons.producerbean.onsaddr = 公网、杭州公有云生产

方式1投递消息代码:

 try {
   string jsonc = jsonutils.tojson(elevenmessage);
   message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes());
   sendresult sendresult = producer.send(message);
   if (sendresult != null) {
     logger.info(".send mq message success!”;

   } else {
     logger.warn(".sendresult is null.........");
   }
   } catch (exception e) {
      logger.warn("doubleelevenallpreservice");
      thread.sleep(1000);//如果有异常,休眠1秒
   }

方式2投递消息代码:(可以每发1000个启动/关闭一次)

   producerbean.start();
try {
   string jsonc = jsonutils.tojson(elevenmessage);
   message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes());
   sendresult sendresult = producer.send(message);
   if (sendresult != null) {
     logger.info(".send mq message success!”;

   } else {
     logger.warn(".sendresult is null.........");
   }
   } catch (exception e) {
      logger.warn("doubleelevenallpreservice");
      thread.sleep(1000);//如果有异常,休眠1秒
   }

   producerbean.shutdown();

方式3:投递消息

 try {
   string jsonc = jsonutils.tojson(elevenmessage);
   message message = new message(mqconfig.topic, mqconfig.tag, jsonc.getbytes());
   producer producer = producerbeansingleton.getinstance().getproducer();
   sendresult sendresult = producer.send(message);
   if (sendresult != null) {
     logger.info("doubleelevenmidservice.send mq message success! topic is:"”;

   } else {
     logger.warn("doubleelevenmidservice.sendresult is null.........");
   }
   } catch (exception e) {
     logger.error("doubleelevenmidservice thread.sleep 1 s___error is "+e.getmessage(), e);
     thread.sleep(1000);//如果有异常,休眠1秒
   }

发送消息的代码一定要捕获异常,不然会重复发送。

这里的topic用自己创建的,elevenmessage是要发送的内容,我这里是自己建的对象

3、消费者

配置启动类:

@configuration
@conditionalonproperty(value = "consumerconfig.enabled", havingvalue = "true", matchifmissing = true)
public class consumerconfig {

  private logger logger = loggerfactory.getlogger(loggerappendertype.smsdist.name());

  @bean
  public consumer consumerfactory(){//不同消费者 这里不能重名
    properties consumerproperties = new properties();
    consumerproperties.setproperty(propertykeyconst.consumerid, mqconfig.consumer_id);
    consumerproperties.setproperty(propertykeyconst.accesskey, mqconfig.access_key);
    consumerproperties.setproperty(propertykeyconst.secretkey, mqconfig.secret_key);
    //consumerproperties.setproperty(propertykeyconst.consumethreadnums,mqconfig.thread_num);
    consumerproperties.setproperty(propertykeyconst.onsaddr, mqconfig.onsaddr);
    consumer consumer = onsfactory.createconsumer(consumerproperties);
    consumer.subscribe(mqconfig.topic, mqconfig.tag, new doubleelevenmessagelistener());//new对应的监听器
    consumer.start();
    logger.info("consumerconfig start success.");
    

    return consumer;

  }
}

cid和onsaddr一点要选对,用自己的,消费者线程数等可以在这里配置

创建消息监听器类,消费消息:

@component
public class messagelistener implements messagelistener {
  private logger logger = loggerfactory.getlogger("remind");

  protected static elevenreposity elevenreposity;
  @resource
  public void setelevenreposity(elevenreposity elevenreposity){
    messagelistener .elevenreposity=elevenreposity;
  }


  @override
  public action consume(message message, consumecontext consumecontext) {

    if(message.gettopic().equals("自己的topic")){//避免消费到其他消息 json转换报错
      try {

      byte[] body = message.getbody();
      string res = new string(body);
      
      //res 是生产者传过来的消息内容

        //业务代码

      }else{
        logger.warn("!");
      }

      } catch (exception e) {
        logger.error("messagelistener.consume error:" + e.getmessage(), e);
      }

      logger.info("messagelistener.receive message”);
      //如果想测试消息重投的功能,可以将action.commitmessage 替换成action.reconsumelater
      return action.commitmessage;
    }else{
      logger.warn();
      return action.reconsumelater;
    }

  }

注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量

浅谈Springboot整合RocketMQ使用心得

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息

三、注意事项

1、发送的消息体 最大为256kb

2、消息最多存在3天

3、消费端默认线程数是20

4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s

5、本地测试或启动的时候,把onsaddr换成公网,不然报错无法启动

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: