欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于EMQX以切分消息方式发送文件 EMQXfilebyte数组 

程序员文章站 2022-04-06 11:21:31
...
1、引入jar包如下:
1> mqtt-client-1.16-SNAPSHOT.jar
2> hawtbuf-1.11.jar
3> hawtdispatch-1.22.jar
4> hawtdispatch-transport-1.22.jar


2、代码如下:

package com.cnd.poc;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;

import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

public class TestMqttClientFuture {

	private static String EMQX_Cluster_URL = "mqttpoc.yourcompany.com";

	private static int EMQX_Cluster_Broker_Port = 1883;

	private static String EMQX_Cluster_Access_UserName = "can_do";

	private static String EMQX_Cluster_Access_Password = "passw0rd";

	private static String EMQX_Cluster_Topic_Name = "YourTm/01012345678";

	public static void main(String[] args) {
		// TODO Auto-generated method stub

		MQTT mqtt = new MQTT();
		try {
			mqtt.setHost(EMQX_Cluster_URL, EMQX_Cluster_Broker_Port);
			mqtt.setUserName(EMQX_Cluster_Access_UserName);
			mqtt.setPassword(EMQX_Cluster_Access_Password);

		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		FutureConnection connection = mqtt.futureConnection();

		boolean isFuture = true;

		Future<Void> f1 = connection.connect();
		try {
			f1.await();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic(EMQX_Cluster_Topic_Name, QoS.AT_LEAST_ONCE) });
		try {
			byte[] qoses = f2.await();

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// We can start future receive..
		Future<Message> receive = connection.receive();
		// how to retrieve message from future object

		// send the message..
		// send file with byte[] in splitted message
		boolean isSendFileMsg = false;
		if (isSendFileMsg) {
			Future<Void> f3 = connection.publish(EMQX_Cluster_Topic_Name, "Hello can_do 24".getBytes(),
					QoS.EXACTLY_ONCE, false);

			// Then the receive will get the message.
			Message message = null;
			try {
				message = receive.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			message.ack();

			System.out.print("=message is:=" + new String(message.getPayload()) + "=end=");
		}

		String fileName = "E:\\to_removed\\20190610\\test_mqtt_file_transfer_del.txt";

		int permFragmentSize = 64;

		boolean isDelFileAfterSent = false;

		sendFileInByteArray(connection, fileName, permFragmentSize, isDelFileAfterSent);

		if (!isFuture) {
			Future<Void> f4 = connection.disconnect();
			try {
				f4.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	/**
	 * send file with byte array
	 */
	public static boolean sendFileInByteArray(FutureConnection paramConn, String paramFileAbsPath,
			int paramPermFragmentSize, boolean delWhenFinished) {
		boolean isSendSuccessful = false;

		File file = new File(paramFileAbsPath);

		InputStream in = null;

		int PER_FRAGMENT_SIZE = paramPermFragmentSize;

		try {
			String encodedName = URLEncoder.encode(file.getName(), "UTF-8");
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		try {
			byte[] block = new byte[PER_FRAGMENT_SIZE];
			int byteread = 0;
			in = new FileInputStream(paramFileAbsPath);

			int i = 0;
			while ((byteread = in.read(block)) != -1) {
				i++;
				paramConn.publish(EMQX_Cluster_Topic_Name, block, QoS.EXACTLY_ONCE, false);
				// Note:you need to clear byte[] after every time sent
				// This statement is vip
				block = new byte[PER_FRAGMENT_SIZE];
			}
			System.out.println("=i is:=" + i + "=end=");

			isSendSuccessful = true;
		} catch (Exception e1) {
			e1.printStackTrace();

			if (in != null)
				try {
					in.close();
				} catch (IOException localIOException) {
				}
		} finally {
			if (in != null)
				try {
					in.close();
				} catch (IOException localIOException1) {
				}
		}
		if (isSendSuccessful) {
			if (delWhenFinished) {
				String strFileReadme = file.getName().split("\\[_]")[0];
				File tmpFile = new File(file.getParent() + File.separator + strFileReadme);
				if (tmpFile.exists()) {
					tmpFile.delete();
				}
				file.delete();
			}
		}
		return isSendSuccessful;
	}

}