欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot集成mqtt的实践开发

程序员文章站 2024-02-23 16:31:10
序 mqtt(message queuing telemetry transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限...


mqtt(message queuing telemetry transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的iot场景。这里简单介绍一下如何在springboot中集成。

maven

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-integration</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.integration</groupid>
      <artifactid>spring-integration-stream</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.integration</groupid>
      <artifactid>spring-integration-mqtt</artifactid>
    </dependency>

配置client factory

  @bean
  public mqttpahoclientfactory mqttclientfactory() {
    defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory();
    factory.setserveruris("tcp://demo:1883");
//    factory.setusername("guest");
//    factory.setpassword("guest");
    return factory;
  }

配置consumer

  @bean
  public integrationflow mqttinflow() {
    return integrationflows.from(mqttinbound())
        .transform(p -> p + ", received from mqtt")
        .handle(logger())
        .get();
  }

  private logginghandler logger() {
    logginghandler logginghandler = new logginghandler("info");
    logginghandler.setloggername("sisample");
    return logginghandler;
  }

  @bean
  public messageproducersupport mqttinbound() {
    mqttpahomessagedrivenchanneladapter adapter = new mqttpahomessagedrivenchanneladapter("sisampleconsumer",
        mqttclientfactory(), "sisampletopic");
    adapter.setcompletiontimeout(5000);
    adapter.setconverter(new defaultpahomessageconverter());
    adapter.setqos(1);
    return adapter;
  }

配置producer

@bean
  public integrationflow mqttoutflow() {
    //console input
//    return integrationflows.from(characterstreamreadingmessagesource.stdin(),
//        e -> e.poller(pollers.fixeddelay(1000)))
//        .transform(p -> p + " sent to mqtt")
//        .handle(mqttoutbound())
//        .get();
    return integrationflows.from(outchannel())
        .handle(mqttoutbound())
        .get();
  }
  
  @bean
  public messagechannel outchannel() {
    return new directchannel();
  }

  @bean
  public messagehandler mqttoutbound() {
    mqttpahomessagehandler messagehandler = new mqttpahomessagehandler("sisamplepublisher", mqttclientfactory());
    messagehandler.setasync(true);
    messagehandler.setdefaulttopic("sisampletopic");
    return messagehandler;
  }

配置messaginggateway

@messaginggateway(defaultrequestchannel = "outchannel")
public interface msgwriter {
  void write(string note);
}

这样就大功告成了

doc


spring-integration-samples-mqtt

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。