欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Android MQTT学习总结以及用法

程序员文章站 2022-03-06 08:14:56
Android MQTT学习总结以及用法前言MQTT是什么MQTT原理Android中使用MQTT前言记录在开发中学习,使用MQTT的经验,以及遇到的坑MQTT是什么 先上一段百度百科的说明MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情......

Android MQTT学习总结以及用法

前言

记录在开发中学习,使用MQTT的经验,以及遇到的坑

MQTT是什么

    先上一段百度百科的说明

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。据Cmstop主要负责消息推送的高级研发工程师李文凯称,随着移动互联网的发展,MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。在未来MQTT会进入到我们生活的各各方面。
MQTT特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

Android中使用MQTT

1.引入MQTT的sdk
在项目的gradle文件中加入下面的代码

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'



import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.util.Log;



import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;


public class MessageReceiveImpl implements MessageReceive {
    private static final String TAG = "MessageReceiveImpl";
    private Context mContext;

    private String mTopic;
    /**
     * The sender tries its best to send the message to the receiver.
     * If the sending fails, the sender will continue to try again until
     * the receiver receives the message. At the same time, it ensures
     * that the receiver will not receive duplicate messages
     * because of message retransmission.
     */
    private final int MQTT_QOS = 2;
    private static MqttAndroidClient mMqttClient;
    private MqttConnectOptions mMqttConOpt;
    private UrlConfigManager mUrlConfigManager;

    private PublishSubject<String> mPushMessageSubject = PublishSubject.create();


    public MessageReceiveImpl(Context Context) {
        mContext = Context;
        mUrlConfigManager = UrlConfigManager.get(mContext);
    }

    @Override
    public void init(String uuid) {
        mTopic = "订阅的topic";
        String mqttHost = "链接地址";
        Log.i(TAG, "mqttHost : " + mqttHost + " mTopic : " + mTopic);
        mMqttClient = new MqttAndroidClient(mContext, mqttHost, uuid);
        mMqttClient.setCallback(mqttCallback);

        mMqttConOpt = new MqttConnectOptions();
        mMqttConOpt.setCleanSession(false);
        mMqttConOpt.setAutomaticReconnect(true);
    }

    @Override
    public void connect() {
        Log.d(TAG, "start connnect");
        if (mMqttClient != null && !mMqttClient.isConnected() && isNetConnected()) {
            try {
                mMqttClient.connect(mMqttConOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public Observable<String> getMessageByPush() {
        return mPushMessageSubject;
    }

    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "connected success ");
            try {
                mMqttClient.subscribe(mTopic, MQTT_QOS);
            } catch (MqttException e) {
                Log.e(TAG, "connected exception failure ");
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            Log.e(TAG, "connected failure");
            arg1.printStackTrace();
            //connect fail, reconnect
            connect();
        }
    };

    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {

            String arrivedPushMsg = new String(message.getPayload());
            Log.i(TAG, "messageArrived:" + arrivedPushMsg + " topic : " + topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained());
            mPushMessageSubject.onNext(arrivedPushMsg);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            Log.i(TAG, "deliveryComplete");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            Log.i(TAG, "connectionLost");
        }
    };

    private boolean isNetConnected() {
        ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.d(TAG, "MQTT current net name : " + name);
            return true;
        } else {
            Log.d(TAG, "MQTT cann't connect net");
            return false;
        }
    }
}


MQTT重点是要使用对正确的service地址和topic要订阅成功。
在连接的时候要判断下网络状态,有网再连接。

本文地址:https://blog.csdn.net/dzy_mails/article/details/103427161