欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C# Mqtt 断线重连

程序员文章站 2022-04-29 14:25:34
在通过 MqttClient 客户端连接之后,在服务端服务重启时,客户端如果没有重连机制,则无法再接收到订阅的消息。 使用的 Mqtt 组件为:M2Mqtt.Net.dll 一些特性发现 (1)如果提供的服务端地址是不可解析的,会引发异常无法实例化 MqttClient 对象。 (2)Connect ......

在通过 mqttclient 客户端连接之后,在服务端服务重启时,客户端如果没有重连机制,则无法再接收到订阅的消息。

使用的 mqtt 组件为:m2mqtt.net.dll

一些特性发现

(1)如果提供的服务端地址是不可解析的,会引发异常无法实例化 mqttclient 对象。
(2)connect 无法连接时会引发异常,isconnected 为 false。
(3)服务端断开会触发客户端的 connectionclosed 事件,isconnected 为 false。
(4)重新 connect 需要重新 subscribe 订阅主题。
(5)mqttclient.subscribe 第一个参数为订阅主题数组,第二个为相应的 qoslevel,两个数组长度必须一致,否则会异常。

重连流程控制

C# Mqtt 断线重连

主要代码实现

(1)线程主体

// 自动重连主体
private void _trycontinueconnect()
{
    if (isconnected) return;

    thread retrythread = new thread(new threadstart(delegate
    {
        while (_mqttclient == null || !_mqttclient.isconnected)
        {
            if (_toclose) break;

            if (_mqttclient == null)
            {
                _buildclient();
                thread.sleep(3000);
                continue;
            }

            try
            {
                _trycount++;
                _connect(); 
            }
            catch (exception ce)
            {
                debug.writeline("re connect exception:" + ce.message);
            }

            // 如果还没连接不符合结束条件则睡2秒
            if (!_mqttclient.isconnected)
            {
                thread.sleep(2000);
            }
        }
    }));
            
    retrythread.start();
}

(2)实例化部分

// 实例化客户端
private void _buildclient()
{
    try
    {
        _mqttclient = new mqttclient(_mqttserver);
    }
    catch (exception e)
    {
        debug.writeline("build client error:" + e.message);
        return;
    }

    // 消息到达事件绑定
    _mqttclient.mqttmsgpublishreceived += client_mqttmsgpublishreceived;

    // 连接断开事件绑定
    _mqttclient.connectionclosed += (sender, e) =>
    {
        if (!_toclose)
        {
            // 尝试重连
            _trycontinueconnect();
        }
    };
}

(3)尝试连接部分

// 发起一次连接,连接成功则订阅相关主题 
private void _connect()
{
    if (string.isnullorempty(_mqttusername))
    {
        var b = _mqttclient.connect(_mqttclientid);
    }
    else
    {
        var b = _mqttclient.connect(_mqttclientid, _mqttusername, _mqttuserpass);
    } 

    if (_mqttclient.isconnected)
    {
        _mqttclient.subscribe(new string[] { "topic1", "topic2" },
            new byte[] { mqttmsgbase.qos_level_at_most_once, mqttmsgbase.qos_level_at_most_once });
    }
}

实测效果不错,其中延时时间可以适当调整。