springboot集成mqtt的实践开发
程序员文章站
2024-02-20 12:25:10
序
mqtt(message queuing telemetry transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限...
序
mqtt(message queuing telemetry transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的iot场景。这里简单介绍一下如何在springboot中集成。
maven
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-integration</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-stream</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-mqtt</artifactid> </dependency>
配置client factory
@bean public mqttpahoclientfactory mqttclientfactory() { defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory(); factory.setserveruris("tcp://demo:1883"); // factory.setusername("guest"); // factory.setpassword("guest"); return factory; }
配置consumer
@bean public integrationflow mqttinflow() { return integrationflows.from(mqttinbound()) .transform(p -> p + ", received from mqtt") .handle(logger()) .get(); } private logginghandler logger() { logginghandler logginghandler = new logginghandler("info"); logginghandler.setloggername("sisample"); return logginghandler; } @bean public messageproducersupport mqttinbound() { mqttpahomessagedrivenchanneladapter adapter = new mqttpahomessagedrivenchanneladapter("sisampleconsumer", mqttclientfactory(), "sisampletopic"); adapter.setcompletiontimeout(5000); adapter.setconverter(new defaultpahomessageconverter()); adapter.setqos(1); return adapter; }
配置producer
@bean public integrationflow mqttoutflow() { //console input // return integrationflows.from(characterstreamreadingmessagesource.stdin(), // e -> e.poller(pollers.fixeddelay(1000))) // .transform(p -> p + " sent to mqtt") // .handle(mqttoutbound()) // .get(); return integrationflows.from(outchannel()) .handle(mqttoutbound()) .get(); } @bean public messagechannel outchannel() { return new directchannel(); } @bean public messagehandler mqttoutbound() { mqttpahomessagehandler messagehandler = new mqttpahomessagehandler("sisamplepublisher", mqttclientfactory()); messagehandler.setasync(true); messagehandler.setdefaulttopic("sisampletopic"); return messagehandler; }
配置messaginggateway
@messaginggateway(defaultrequestchannel = "outchannel") public interface msgwriter { void write(string note); }
这样就大功告成了
doc
spring-integration-samples-mqtt
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 关于Spring总结(必看篇)
推荐阅读
-
Springboot2.X集成redis集群(Lettuce)连接的方法
-
SpringBoot集成slf4j+log4j2的示例代码
-
SpringBoot集成MyBatis的分页插件PageHelper实例代码
-
图数据库实践系列 (三)--Neo4j Spatial的REST集成
-
SpringBoot入门之集成JSP的示例代码
-
软件开发文档的持续集成 软件测试项目管理UML工作单元测试
-
springboot 1.5.2 集成kafka的简单例子
-
SpringBoot入门之集成Druid的方法示例
-
创建SpringBoot工程并集成Mybatis的方法
-
创建SpringBoot工程并集成Mybatis的方法