欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Android使用的MQTT客户端

程序员文章站 2022-05-18 21:05:17
...

Android使用的MQTT客户端,支持订阅、发送消息;支持创建连接到本地保存;支持话题消息筛选;
使用视频:https://dwz.cn/undJFEnq
小米应用商店也有~

Android使用的MQTT客户端

核心代码贴一下,做个记录


import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;

import androidx.annotation.Nullable;

import com.annimon.stream.Collectors;
import com.annimon.stream.Stream;
import com.freddon.android.snackkit.extension.regex.RegexHelper;
import com.freddon.android.snackkit.extension.tools.NetSuit;
import com.freddon.android.snackkit.log.Loger;
import com.qiniu.util.StringUtils;
import com.sagocloud.ntworker.agent.App;
import com.sagocloud.ntworker.agent.RxEventBus;
import com.sagocloud.ntworker.mqtt.ActionEventType;
import com.sagocloud.ntworker.mqtt.EventType;
import com.sagocloud.ntworker.mqtt.bean.MQTTConnectUserEntity;
import com.sagocloud.ntworker.mqtt.bean.MQTTMessage;
import com.sagocloud.ntworker.mqtt.bean.MqttConnectPoint;
import com.sagocloud.ntworker.mqtt.event.MQTTClientActionEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTTransferEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTMessageEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTStateEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTTraceEvent;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.android.service.MqttTraceHandler;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;

public class MQTTService extends Service {

    public final static String CONN = "CON_MQTT_CF";
    private MqttAndroidClient mqttAndroidClient;
    private MQTTMessageEvent mQTTConnectEvent;
    private MQTTConnectUserEntity connectPoint;
    private MqttConnectOptions mMqttConnectOptions;

    private MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            mQTTConnectEvent = new MQTTMessageEvent();
            mQTTConnectEvent.setType(EventType.connectionLost);
            Loger.e("????connectionLost:", cause.getMessage());
            RxEventBus.post(mQTTConnectEvent);
            RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected = false));
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            mQTTConnectEvent = new MQTTMessageEvent();
            mQTTConnectEvent.setType(EventType.messageArrived);
            mQTTConnectEvent.setTopic(topic);
            mQTTConnectEvent.setMessage(message);
            Loger.e("✉️messageArrived:", topic);
            RxEventBus.post(mQTTConnectEvent);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            mQTTConnectEvent = new MQTTMessageEvent();
            mQTTConnectEvent.setType(EventType.deliveryComplete);
            try {
                mQTTConnectEvent.setTopic(StringUtils.join(token.getTopics(), ","));
                mQTTConnectEvent.setMessage(token.getMessage());
            } catch (MqttException e) {
                e.printStackTrace();
            }
            Loger.e("????deliveryComplete:", token.toString());
            RxEventBus.post(mQTTConnectEvent);
        }
    };

    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            RxEventBus.post(new MQTTTransferEvent(asyncActionToken, null));
            Loger.e("????onSuccess:", "" + Arrays.toString(asyncActionToken.getTopics()));
            App.mqttIsConnected = true;
            RxEventBus.post(new MQTTStateEvent(true));
            DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
            disconnectedBufferOptions.setBufferEnabled(true);
            disconnectedBufferOptions.setBufferSize(100);
            disconnectedBufferOptions.setPersistBuffer(false);
            disconnectedBufferOptions.setDeleteOldestMessages(false);
            mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            RxEventBus.post(new MQTTTransferEvent(asyncActionToken, exception.getMessage()));
            Loger.e("????onFailure:", "" + exception.getMessage());
            App.mqttIsConnected = false;
            RxEventBus.post(new MQTTStateEvent(false));
        }
    };
    private CompositeDisposable subscription;
    private MqttTraceHandler traceCallback = new MqttTraceHandler() {
        @Override
        public void traceDebug(String tag, String message) {
            Loger.e("????traceDebug:" + tag, "" + message);
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));
            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));
        }

        @Override
        public void traceError(String tag, String message) {
            Loger.e("????traceError:" + tag, "" + message);
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));
            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));
        }

        @Override
        public void traceException(String tag, String message, Exception e) {
            Loger.e("????traceException:" + tag, "" + message + e.getMessage());
            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));
        }
    };
    private Disposable actSubscription;
    private Disposable actTimerSubscription;


    public static void startService(Context context, MQTTConnectUserEntity point) {
        Intent service = new Intent();
        service.setClass(context, MQTTService.class);
        service.putExtra(CONN, point);
        context.startService(service);
    }

    private void $prepareActionHandler() {
        if (subscription == null) {
            subscription = new CompositeDisposable();
        }
        subscription.clear();
        if (actSubscription != null && actSubscription.isDisposed()) {
            actSubscription.dispose();
        }
        if (actTimerSubscription != null && actTimerSubscription.isDisposed()) {
            actTimerSubscription.dispose();
        }
        actSubscription = RxEventBus.subscribeIOEvent(
                MQTTClientActionEvent.class,
                event -> {
                    ActionEventType type = event.getEventType();
                    Object payload = event.getPayload();
                    switch (type) {
                        case connect:
                            connect();
                            break;
                        case publish:
                            if (payload instanceof MQTTMessage) {
                                publish((MQTTMessage) payload);
                            }
                            break;
                        case subscribe:
                            if (payload instanceof MQTTMessage) {
                                subscribe((MQTTMessage) payload);
                            }
                            break;
                        case unsubscribe:
                            if (payload instanceof MQTTMessage) {
                                unsubscribe((MQTTMessage) payload);
                            }
                            break;
                        case unsubscribe_all:
                            if (payload instanceof String[]) {
                                unsubscribeAll((String[]) payload);
                            }
                            break;
                        case close:
                            disconnect();
                            mqttAndroidClient = null;
                            App.mqttIsConnected = false;
                            RxEventBus.post(new MQTTStateEvent(false));
                            stopSelf();
                            break;
                    }
                },
                error -> {
                    Loger.d("error", error.getMessage());
                }
        );
        actTimerSubscription = Observable.interval(2000, TimeUnit.MILLISECONDS)
                .subscribe((i) -> {
                    App.mqttIsConnected = mqttAndroidClient != null && mqttAndroidClient.isConnected();
                    RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected));
                });
        subscription.add(actSubscription);
        subscription.add(actTimerSubscription);
    }

    @Override
    public void onDestroy() {
        disconnect();
        RxEventBus.unsubscribeEvent(subscription);
        super.onDestroy();
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        if (intent != null) {
            connectPoint = intent.getParcelableExtra(CONN);
            if (connectPoint != null) {
                $prepareActionHandler();
                connect();
            } else {
                if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
                    connect();
                }
            }
        }
        return super.onStartCommand(intent, flags, startId);
    }

    private void $prepared(MqttConnectPoint connectPoint) {
        String serverURI = String.format(Locale.ENGLISH, "%s://%s:%s", connectPoint.isUseSSL() ? "ssl" : "tcp", connectPoint.getHost(), connectPoint.getPort());
        if (mqttAndroidClient != null) {
            disconnect();
        }
        mqttAndroidClient = new MqttAndroidClient(this, serverURI, connectPoint.getClientId());
        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
        mqttAndroidClient.setTraceEnabled(true);
        mqttAndroidClient.setTraceCallback(traceCallback);
        mMqttConnectOptions = new MqttConnectOptions();
        mMqttConnectOptions.setMqttVersion(connectPoint.getVersion());
        mMqttConnectOptions.setMaxInflight(connectPoint.getMaxInflight());
        mMqttConnectOptions.setAutomaticReconnect(connectPoint.isAutoReconnect());
        mMqttConnectOptions.setCleanSession(connectPoint.isClearSession()); //设置是否清除缓存
        mMqttConnectOptions.setConnectionTimeout(connectPoint.getConnectTimeout()); //设置超时时间,单位:秒
        mMqttConnectOptions.setKeepAliveInterval(connectPoint.getTickTime()); //设置心跳包发送间隔,单位:秒
        if (RegexHelper.isAllNotEmpty(connectPoint.getUserName(), connectPoint.getUserPasswort())) {
            mMqttConnectOptions.setUserName(connectPoint.getUserName()); //设置用户名
            mMqttConnectOptions.setPassword(connectPoint.getUserPasswort().toCharArray()); //设置密码
        }
        if (connectPoint.isUseSSL() && connectPoint.getSslProperties() != null) {
            mMqttConnectOptions.setSSLProperties(connectPoint.getSslProperties());
        }
        if (RegexHelper.isNotEmpty(connectPoint.getLwt())) {
            mMqttConnectOptions.setWill(connectPoint.getLwt().getTopic(), connectPoint.getLwt().getMessage().getBytes(), connectPoint.getLwt().getQos(), connectPoint.getLwt().isRetained());
        }
    }

    private void connect() {
        if (mqttAndroidClient == null) {
            $prepared(connectPoint);
        }
        if (!mqttAndroidClient.isConnected() && NetSuit.checkEnable(this)) {
            try {
                mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

    private void disconnect() {
        try {
            if (mqttAndroidClient == null) return;
            mqttAndroidClient.unregisterResources();
            mqttAndroidClient.disconnect();
            mqttAndroidClient.close();
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            mqttAndroidClient = null;
        }

    }

    private void subscribe(MQTTMessage subscribe) {
        try {
            if (mqttAndroidClient == null || subscribe == null || subscribe.getTopic() == null)
                return;
            MqttMessage.validateQos(subscribe.getQos());
            List<MQTTMessage> sub = connectPoint.getSubTopics();
            if (sub == null) {
                sub = new ArrayList<>();
            }
            Long count = Stream.of(sub)
                    .filter(item -> subscribe.getTopic().equalsIgnoreCase(item.getTopic()))
                    .collect(Collectors.counting());
            if (count > 0) {
                return;
            }
            sub.add(subscribe);
            connectPoint.setSubTopics(sub);
            mqttAndroidClient.subscribe(subscribe.getTopic(), subscribe.getQos());
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void subscribeAll(String[] topics, int[] qos) {
        if (RegexHelper.isAnyEmpty(topics, qos)) return;
        if (mqttAndroidClient == null) return;
        if (topics.length != qos.length) return;
        try {
            mqttAndroidClient.subscribe(topics, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void unsubscribe(MQTTMessage subscribe) {
        try {
            if (mqttAndroidClient == null || connectPoint == null) return;
            List<MQTTMessage> sub = connectPoint.getSubTopics();
            if (sub != null) {
                List<MQTTMessage> filtered = Stream.of(sub)
                        .filter(item -> !subscribe.getTopic().equalsIgnoreCase(item.getTopic()))
                        .collect(Collectors.toList());
                connectPoint.setSubTopics(filtered);
            }
            mqttAndroidClient.unsubscribe(subscribe.getTopic());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void unsubscribeAll(String[] topics) {
        try {
            if (mqttAndroidClient == null) return;
            if (topics == null) mqttAndroidClient.unsubscribe("#");
            else {
                mqttAndroidClient.unsubscribe(topics);
            }
            if (connectPoint != null) {
                connectPoint.setSubTopics(null);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void publish(MQTTMessage subscribe) {
        try {
            MqttMessage.validateQos(subscribe.getQos());
            mqttAndroidClient.publish(subscribe.getTopic(), subscribe.getMessage().getBytes(), subscribe.getQos(), subscribe.isRetained());
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}