基于EMQX以切分消息方式发送文件 EMQXfilebyte数组
程序员文章站
2022-04-06 11:21:31
...
1、引入jar包如下:
2、代码如下:
1> mqtt-client-1.16-SNAPSHOT.jar 2> hawtbuf-1.11.jar 3> hawtdispatch-1.22.jar 4> hawtdispatch-transport-1.22.jar
2、代码如下:
package com.cnd.poc; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.net.URLEncoder; import org.fusesource.mqtt.client.Future; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; public class TestMqttClientFuture { private static String EMQX_Cluster_URL = "mqttpoc.yourcompany.com"; private static int EMQX_Cluster_Broker_Port = 1883; private static String EMQX_Cluster_Access_UserName = "can_do"; private static String EMQX_Cluster_Access_Password = "passw0rd"; private static String EMQX_Cluster_Topic_Name = "YourTm/01012345678"; public static void main(String[] args) { // TODO Auto-generated method stub MQTT mqtt = new MQTT(); try { mqtt.setHost(EMQX_Cluster_URL, EMQX_Cluster_Broker_Port); mqtt.setUserName(EMQX_Cluster_Access_UserName); mqtt.setPassword(EMQX_Cluster_Access_Password); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } FutureConnection connection = mqtt.futureConnection(); boolean isFuture = true; Future<Void> f1 = connection.connect(); try { f1.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic(EMQX_Cluster_Topic_Name, QoS.AT_LEAST_ONCE) }); try { byte[] qoses = f2.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // We can start future receive.. Future<Message> receive = connection.receive(); // how to retrieve message from future object // send the message.. // send file with byte[] in splitted message boolean isSendFileMsg = false; if (isSendFileMsg) { Future<Void> f3 = connection.publish(EMQX_Cluster_Topic_Name, "Hello can_do 24".getBytes(), QoS.EXACTLY_ONCE, false); // Then the receive will get the message. Message message = null; try { message = receive.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } message.ack(); System.out.print("=message is:=" + new String(message.getPayload()) + "=end="); } String fileName = "E:\\to_removed\\20190610\\test_mqtt_file_transfer_del.txt"; int permFragmentSize = 64; boolean isDelFileAfterSent = false; sendFileInByteArray(connection, fileName, permFragmentSize, isDelFileAfterSent); if (!isFuture) { Future<Void> f4 = connection.disconnect(); try { f4.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * send file with byte array */ public static boolean sendFileInByteArray(FutureConnection paramConn, String paramFileAbsPath, int paramPermFragmentSize, boolean delWhenFinished) { boolean isSendSuccessful = false; File file = new File(paramFileAbsPath); InputStream in = null; int PER_FRAGMENT_SIZE = paramPermFragmentSize; try { String encodedName = URLEncoder.encode(file.getName(), "UTF-8"); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { byte[] block = new byte[PER_FRAGMENT_SIZE]; int byteread = 0; in = new FileInputStream(paramFileAbsPath); int i = 0; while ((byteread = in.read(block)) != -1) { i++; paramConn.publish(EMQX_Cluster_Topic_Name, block, QoS.EXACTLY_ONCE, false); // Note:you need to clear byte[] after every time sent // This statement is vip block = new byte[PER_FRAGMENT_SIZE]; } System.out.println("=i is:=" + i + "=end="); isSendSuccessful = true; } catch (Exception e1) { e1.printStackTrace(); if (in != null) try { in.close(); } catch (IOException localIOException) { } } finally { if (in != null) try { in.close(); } catch (IOException localIOException1) { } } if (isSendSuccessful) { if (delWhenFinished) { String strFileReadme = file.getName().split("\\[_]")[0]; File tmpFile = new File(file.getParent() + File.separator + strFileReadme); if (tmpFile.exists()) { tmpFile.delete(); } file.delete(); } } return isSendSuccessful; } }
下一篇: PHP如何获取Cookie并实现模拟登录