SpringBoot整合rabbitmq实现mqtt订阅/发布之功能
程序员文章站
2022-04-15 17:56:20
一、整体架构示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。二、代码实现1.MqttGatewayimport org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.han......
一、整体架构
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
二、代码实现
1.MqttGateway
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic); }
2.订阅
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.configurationprocessor.json.JSONObject; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.*; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import java.util.Date; @Configuration @IntegrationComponentScan @Slf4j public class MqttReceiveConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.completionTimeout}") private int completionTimeout; //连接超时 @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session, // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息 mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(20); factory.setConnectionOptions(mqttConnectOptions); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), "amq.topic"); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); adapter.setTaskScheduler(new ConcurrentTaskScheduler()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { Object m = message.getPayload(); System.out.println("message ==> " + m); MessageHeaders headers = message.getHeaders(); System.out.println("headers ==> " + headers); String topic = headers.get(MqttHeaders.TOPIC).toString(); System.out.println("topic ==> " + topic); String string = message.getPayload().toString(); if (!string.isEmpty()) { System.out.println("string ==> " + string); } } // }; }
该处使用的url网络请求的数据。
3.发布
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration @IntegrationComponentScan @EnableIntegration public class MqttSenderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); factory.setConnectionOptions(mqttConnectOptions); MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, factory); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
4. pom文件添加
<!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
5.application.properties添加
spring.mqtt.username=mqtt-test #MQTT-密码,需要解密 spring.mqtt.password=mqtt-test #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://192.168.2.135:1883,tcp://192.168.2.133:1883 spring.mqtt.url=tcp://127.0.0.1:1883 #MQTT-默认的消息推送主题,实际可在调用接口时指定 spring.mqtt.default.topic=defaultTopic #两个客户端的clientId不能相同,生产者和消费者的clientId不能相同 spring.mqtt.client.id=${random.value} spring.mqtt.completionTimeout=3000
6.rabbitmq安装及部署参考
https://blog.csdn.net/weixin_44015043/article/details/104861596#comments_13065410
总结
以上是个人做项目时候做的 在做项目的时候问过别的博主 最终通行在我的电脑上的demo。
本文地址:https://blog.csdn.net/Chs_12/article/details/108238443