欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Android开发:MQTT-Mosquitto的基本搭建和使用

程序员文章站 2022-04-29 08:10:26
...

Mosquitto搭建参考文章:https://blog.csdn.net/pgpanda/article/details/51800865
Android MQTT参考文章:https://blog.csdn.net/qq_26629729/article/details/83987974

声明:自学不久的Android,以下都是的Mosquitto搭建及代码的编写都是参考上面两篇文章加自己的理解,非完全正确,很多地方还没有理解,只是基本的实现了消息的推送,如果有错误的地方可以给我留言,多多包涵

Mosquitto下载

链接:http://pan.baidu.com/s/1o8Swnv8 密码:si95
这也是参考文章里面抄过来的,哈哈哈哈

Mosquitto的安装和简单的使用

下载完之后是个可直接运行的.exe的可执行程序,直接运行就好了,过程如下:
Android开发:MQTT-Mosquitto的基本搭建和使用Android开发:MQTT-Mosquitto的基本搭建和使用Android开发:MQTT-Mosquitto的基本搭建和使用
安装完之后可以在安卓目录里面找到自动生成的
mosquitto文件夹,打开文件夹,如下:
Android开发:MQTT-Mosquitto的基本搭建和使用
在路径栏直接输入cmd回车,打开cmd窗口:Android开发:MQTT-Mosquitto的基本搭建和使用
在cmd里面输入mosquitto 开启服务,(按下回车开启服务不会出现任何东西)
Android开发:MQTT-Mosquitto的基本搭建和使用
然后再在从mosquitto文件夹打开一个新cmd窗口,输入mosquitto_sub -v -t TestTopic(订阅一个叫TestTopic的主题)
Android开发:MQTT-Mosquitto的基本搭建和使用
然后继续还是从mosquitto文件夹打开一个新cmd窗口,输入mosquitto_pub -t TestTopic -m TestMessage(向主题TestTopic发布一个内容为TestMessage的消息),接着我们看回上一个cmd窗口,会发现收到了一个内容为TestMessage的消息
Android开发:MQTT-Mosquitto的基本搭建和使用
Android开发:MQTT-Mosquitto的基本搭建和使用
每次输入完命令记得回车执行;
到这里我们的Mosquitto已经搭建好了,关于Mosquitto的配置这里就不多讲了(因为我也没了解太深入)大家去百度,接下来就是Android code部分了

Android通过MQTT+Mosquitto实现通知推送

一:首先在AndroidManifest.xml文件里面加上我们需要的权限

<!--权限 -->
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<!--注册Service和广播 -->
<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService -->
        <service android:name="tool.MyMqttService" /><!-- MqttService -->

        <receiver
            android:name="tool.MyReceiver"
            android:enabled="true"
            android:exported="false"
            >
            <intent-filter>
                <category android:name="tool" />
                <action android:name="android.intent.action.BOOT_COMPLETED" />
                <action android:name="android.intent.action.USER_PRESENT" />
                <action android:name="tool.myreceiver" />
            </intent-filter>
        </receiver>

**二:在app的build.gradle文件加上

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}
dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

最后怕小白们看不懂(我也是小白),贴个项目图:
Android开发:MQTT-Mosquitto的基本搭建和使用
下面贴上MyMqttService服务类的代码编写:


import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.support.v4.app.NotificationCompat;
import android.util.Log;

import com.example.Entity.MQTTConMessage;
import com.example.Entity.MQTTMessage;
import com.example.w9.HomeActivity;
import com.example.w9.R;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MyMqttService extends Service {
    //通知
    private Notification notification;
    private NotificationManager manager;
    private int i = 1;

    public static final String TAG = "MyMqttService";

    //封装好的MQTTClient供操作,发送和接手等设置都用它
    private static MqttAndroidClient client;
    //MQTT连接参数设置
    private MqttConnectOptions conOpt;

    /**
    *Mosquitto连接参数
    *host = tcp://加ip地址+端口号
    *userName = mosquitto (默认)
    *passWord = "" (默认)
    **/
    private String host = "tcp://192.168.0.1:1883";
    private String userName = "mosquitto";
    private String passWord ="";

    //clientId必须是唯一,用于订阅主题
    private String clientId ="39f06ced-edc5-490b-bb15-b52ec93b8b0d";

    private MyReceiver myReceiver;
    private static boolean isCloseService=false;

    @Override
    public void onCreate() {
        super.onCreate();

    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        init();
        return super.onStartCommand(intent, flags, startId);
    }

    /**
     * 发布主题消息:
     * qos:服务质量
     * qos=0"至多一次",消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。
     * qos=1"至少一次",确保消息到达,但消息重复可能会发生。
     * qos=2"只有一次",确保消息到达一次。
     * retained是否在服务器保留断开连接后的最后一条消息
     * msg:消息内容
     * **/
    public static void publish(String msg,String topic){

        //   String topic = myTopic;
        Integer qos = 2;
        Boolean retained = false;
        try {
            client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅主题
     *
     * **/
    public static boolean subscribe(String[] topicName, int qos[]){
        boolean flag = false;
        if (client != null && client.isConnected()) {
            try {
                /**
                 * 订阅(订阅话题和服务质量以数组方式传递,
                 * 订阅话题:String arr []={"global_notification"};
                 * 服务质量int qos []={2};)
                 **/
                client.subscribe(topicName, qos, null, new IMqttActionListener() {
                    //订阅成功
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        //Log.e("Subscribed","Subscribed");
                    }
                    //订阅失败
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        //Log.e("Failed to subscribe","Failed to subscribe");
                    }
                });

                flag = true;
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        else {

        }
        return flag;
    }

    private void init() {
        // 服务器地址(协议+地址+端口号)
        String uri = host;

        client = new MqttAndroidClient(getApplicationContext(), uri, clientId);

        // 设置MQTT监听并且接受消息
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                //断开连接必须重新订阅才能收到消息
                if(reconnect){
                    //需要订阅的主题
                    String arr []={"topicTest01","LXL"};
                    //服务质量
                    int qos []={2,2};
                    //执行订阅方法
                    subscribe(arr,qos);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                Log.e("连接失败", "重连"+cause);
                doClientConnection();
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String notificationMsg = new String(message.getPayload(), "UTF-8");
                manager = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
                // 初始化
                Intent intent = new Intent(MyMqttService.this, HomeActivity.class);
                PendingIntent pendingIntent = PendingIntent.getActivity(MyMqttService.this, 0, intent, 0);
                notification = new Notification.Builder(MyMqttService.this)
                        .setSmallIcon(R.drawable.logo)
                        //设置通知栏标题
                        .setContentTitle("百智通知")
                        //设置通知栏显示内容
                        .setContentText(notificationMsg)
                        //设置通知栏点击意图
                        .setContentIntent(pendingIntent)
                        //通知首次出现在通知栏,带上升动画效果的
                        .setTicker("测试通知来啦")
                        //通知产生的时间,会在通知信息里显示,一般是系统获取到的时间
                        .setWhen(System.currentTimeMillis())
                        //设置这个标志当用户单击面板就可以让通知将自动取消
                        .setAutoCancel(true)
                        //设置该通知优先级
                        .setPriority(Notification.PRIORITY_MAX)
                        //向通知添加声音、闪灯和振动效果的最简单、最一致的方式是使用当前的用户默认设置,使用defaults属性DEFAULT_VIBRATE
                        .setDefaults(Notification.DEFAULT_VIBRATE)
                        .build();

                // 向系统发送通知
                try{
                    manager.notify(i++, notification);
                }catch (Exception e) {
                    int o = 0;
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });

        conOpt = new MqttConnectOptions();
        conOpt.setAutomaticReconnect(true);
        // 清除缓存
        conOpt.setCleanSession(false);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        // 用户名
        conOpt.setUserName(userName);
        // 密码
        conOpt.setPassword(passWord.toCharArray());

        //动态注册广播,也可以在AndroidManifest.xml文件注册广播
        myReceiver = new MyReceiver();
        IntentFilter filter = new IntentFilter();
        filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        registerReceiver(myReceiver, filter);

    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        //注销广播
        unregisterReceiver(myReceiver);

        try {
            if(client!=null){
                //断开连接
                client.disconnect();
                //关闭释放资源
                client.unregisterResources();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }

        if(isCloseService){
            isCloseService=false;
            //完全退出
            Log.e("完全退出","true");
        }else{
            //服务停止,重新开启服务。
            Log.e("服务已被杀死","false");
            stopForeground(true);
            Intent intent = new Intent("tool.myreceiver");
            sendBroadcast(intent);
        }

    }
    public static void closeConnect(){
        isCloseService=true;
    }

    /** 连接MQTT服务器
     * client.isConnected : client是否在线 (true,false)
     * */
    private void doClientConnection() {

        if (!client.isConnected() && isConnectIsNomarl()) {
            try {
                client.connect(conOpt, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        //需要订阅的主题
                        String arr []={"topicTest01","LXL"};
                        //服务质量
                        int qos []={2,2};
                        //执行订阅方法
                        subscribe(arr,qos);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable arg1) {
                        //在命令行打印异常信息在程序中出错的位置及原因
                        arg1.printStackTrace();
                    }
                });
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

    /** 判断网络是否连接 */
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();

        if (info != null && info.isConnected()) {
            String name = info.getTypeName();
            Log.e(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.e(TAG, "MQTT 没有可用网络");
            return false;
        }
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    private class MyReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            if (!isConnectIsNomarl()) {
                Log.e("网络错误","网络错误");
            } else {
                doClientConnection();
            }
        }
    }
}

编写MyReceiver继承BroadcastReceiver

public class MyReceiver extends BroadcastReceiver {

    @Override
    public void onReceive(Context context, Intent intent) {
        if(intent.getAction().equals("tool.myreceiver")){
            Log.e("MyReceiver","start");
            Intent sevice = new Intent(context, MyMqttService.class);
            context.startService(sevice);
        }
    }
}

最后在Activity的onCreate方法中开启服务就好了:

protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_home);

        //开启MQTT
        startService(new Intent(this, MyMqttService.class));

    }

到这里,Android+MQTT +Mosquitto从搭建到代码编写已经结束了,如果有不正确的地方请指出,谢谢。再次感谢上面两篇参考文章的博主