欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

使用Kotlin+RocketMQ实现延时消息的示例代码

程序员文章站 2022-05-14 15:26:39
一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。 使用延时消息的典型场景,例如: 在...

一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

  • 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。
  • 在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

  • 轮询遍历数据库记录
  • jdk 的 delayqueue
  • scheduledexecutorservice
  • 基于 quartz 的定时任务
  • 基于 redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 rocketmq。

二. rocketmq

rocketmq 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。rocketmq 是2012年阿里巴巴开源的第三代分布式消息中间件。

使用Kotlin+RocketMQ实现延时消息的示例代码

三. rocketmq 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 abstractproducer。

abstract class abstractproducer :producerbean() {
  var producerid: string? = null
  var topic: string? = null
  var tag: string?=null
  var timeoutmillis: int? = null
  var delaysendtimemills: long? = null

  val log = logfactory.getlog(this.javaclass)

  open fun sendmessage(messagebody: any, tag: string) {
    val msgbody = json.tojsonstring(messagebody)
    val message = message(topic, tag, msgbody.tobytearray())

    if (delaysendtimemills != null) {
      val startdelivertime = system.currenttimemillis() + delaysendtimemills!!
      message.startdelivertime = startdelivertime
      log.info( "send delay message producer startdelivertime:${startdelivertime}currenttime :${system.currenttimemillis()}")
    }
    val logmessageid = buildlogmessageid(message)
    try {
      val sendresult = send(message)
      log.info(logmessageid + "producer messageid: " + sendresult.getmessageid() + "\n" + "messagebody: " + msgbody)
    } catch (e: exception) {
      log.error(logmessageid + "messagebody: " + msgbody + "\n" + " error: " + e.message, e)
    }

  }

  fun buildlogmessageid(message: message): string {
    return "topic: " + message.topic + "\n" +
        "producer: " + producerid + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n"
  }
}

根据业务需要,增加一个支持重试机制的 producer

@component
@configurationproperties("mqs.ons.producers.xxx-producer")
@configuration
@data
class cleanreportpusheventproducer :abstractproducer() {

  lateinit var delaysecondlist:list<long>

  fun sendmessage(messagebody: cleanreportpusheventmessage){
    //重试超过次数之后不再发事件
    if (delaysecondlist!=null) {

      if(messagebody.times>=delaysecondlist.size){
        return
      }
      val msgbody = json.tojsonstring(messagebody)
      val message = message(topic, tag, msgbody.tobytearray())
      val delaytimemills = delaysecondlist[messagebody.times]*1000l
      message.startdelivertime = system.currenttimemillis() + delaytimemills
      log.info( "messagebody: " + msgbody+ "startdelivertime: "+message.startdelivertime )
      val logmessageid = buildlogmessageid(message)
      try {
        val sendresult = send(message)
        log.info(logmessageid + "producer messageid: " + sendresult.getmessageid() + "\n" + "messagebody: " + msgbody)
      } catch (e: exception) {
        log.error(logmessageid + "messagebody: " + msgbody + "\n" + " error: " + e.message, e)
      }
    }
  }
}

在 cleanreportpusheventproducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delaytimemills 。

通过 system.currenttimemillis() + delaytimemills 可以设置 message 的 startdelivertime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 rocketmq,因此支持精度为秒级别的延迟消息。在开源版本中,rocketmq 只支持18个特定级别的延迟消息。:(

3.3 消费者(consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 push 类型的 abstractconsumer:

@data
abstract class abstractconsumer ():messagelistener{

  var consumerid: string? = null

  lateinit var subscribeoptions: list<subscribeoptions>

  var threadnums: int? = null

  val log = logfactory.getlog(this.javaclass)

  override fun consume(message: message, context: consumecontext): action {
    val logmessageid = buildlogmessageid(message)
    val body = string(message.body)
    try {
      log.info(logmessageid + " body: " + body)
      val result = consumeinternal(message, context, json.parseobject(body, getmessagebodytype(message.tag)))
      log.info(logmessageid + " result: " + result.name)
      return result
    } catch (e: exception) {
      if (message.reconsumetimes >= 3) {
        log.error(logmessageid + " error: " + e.message, e)
      }
      return action.reconsumelater
    }

  }

  abstract fun getmessagebodytype(tag: string): type?

  abstract fun consumeinternal(message: message, context: consumecontext, obj: any): action

  protected fun buildlogmessageid(message: message): string {
    return "topic: " + message.topic + "\n" +
        "consumer: " + consumerid + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n" +
        "msgid:" + message.msgid + "\n" +
        "borntimestamp" + message.borntimestamp + "\n" +
        "startdelivertime:" + message.startdelivertime + "\n" +
        "reconsumetimes:" + message.reconsumetimes + "\n"
  }
}

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

@configuration
@configurationproperties("mqs.ons.consumers.clean-report-push-event-consumer")
@data
class cleanreportpusheventconsumer(val cleanreportservice: cleanreportservice,val eventproducer:cleanreportpusheventproducer):abstractconsumer() {

  val logger: logger = loggerfactory.getlogger(this.javaclass)

  override fun consumeinternal(message: message, context: consumecontext, obj: any): action {
    if(obj is cleanreportpusheventmessage){
      //清除事件
      logger.info("consumer clean-report event report_id:${obj.id} ")

      //消费失败之后再发送一次消息
      if(!cleanreportservice.sendcleanreportevent(obj.id)){
        val times = obj.times+1
        eventproducer.sendmessage(cleanreportpusheventmessage(obj.id,times))
      }
    }
    return action.commitmessage
  }

  override fun getmessagebodytype(tag: string): type? {
    return cleanreportpusheventmessage::class.java
  }
}

其中,cleanreportservice 的 sendcleanreportevent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventproducer 的 sendmessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 consumerfactory

@component
class consumerfactory(val consumers: list<abstractconsumer>,val aliyunonsoptions: aliyunonsoptions) {

  val logger: logger = loggerfactory.getlogger(this.javaclass)


  @postconstruct
  fun start() {
    completablefuture.runasync{
      consumers.stream().foreach {
        val properties = buildproperties(it.consumerid!!, it.threadnums)
        val consumer = onsfactory.createconsumer(properties)
        if (it.subscribeoptions != null && !it.subscribeoptions!!.isempty()) {
          for (options in it.subscribeoptions!!) {
            consumer.subscribe(options.topic, options.tag, it)
          }
          consumer.start()
          val message = "\n".plus(
              it.subscribeoptions!!.stream().map{ a -> string.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
                  .collect(collectors.tolist<any>()))
          logger.info(string.format("consumer: %s\n", message))
        }
      }
    }
  }

  private fun buildproperties(consumerid: string,threadnums: int?): properties {
    val properties = properties()
    properties.put(propertykeyconst.consumerid, consumerid)
    properties.put(propertykeyconst.accesskey, aliyunonsoptions.accesskey)
    properties.put(propertykeyconst.secretkey, aliyunonsoptions.secretkey)
    if (stringutils.isnotempty(aliyunonsoptions.onsaddr)) {
      properties.put(propertykeyconst.onsaddr, aliyunonsoptions.onsaddr)
    } else {
      // 测试环境接入rocketmq
      properties.put(propertykeyconst.namesrv_addr, aliyunonsoptions.nameserveraddress)
    }
    properties.put(propertykeyconst.consumethreadnums, threadnums!!)
    return properties
  }
}

四. 总结

正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 rocketmq,借助成熟的 rocketmq 实现延时消息不失为一种可靠而又方便的方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。