欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#的ThreadStart 和 Thread 

程序员文章站 2022-06-02 22:43:04
段时间在使用MQTTnet,都说这个东西比较好,可是翻了翻网上没有例子给参考一下。 今天算是找到了,给高手的帖子做个宣传吧. 原网址如下:https://blog.csdn.net/chenlu5201314/article/details/94740765 由于GitHub上介绍的东西比较少,以我 ......

段时间在使用mqttnet,都说这个东西比较好,可是翻了翻网上没有例子给参考一下。

今天算是找到了,给高手的帖子做个宣传吧.

原网址如下:

由于github上介绍的东西比较少,以我的水平真是不知道怎么用,先照葫芦画瓢,再看看怎么回事吧:

功能:

把订阅与发布做成一个类,还带有自动重连的功能

using system.threading;    
using system.threading.tasks;
using mqttnet;          
using mqttnet.client;      //客户端需要用到
using mqttnet.client.options;  //具体连接时需要用到的属性,id的名称,要连接server的名称,接入时用到的账号和密码,掉线时是否重新清除原有名称,还有许多...
using mqttnet.packets;    //这个没用上
using mqttnet.protocol;    //这个也没用上
using mqttnet.client.receiving;    //接收
using mqttnet.client.disconnecting;  //断线
using mqttnet.client.connecting;    //连接

新建一个类:先写一下变量和一些字段

class hosmqtt   
{        
private static mqttclient mqttclient = null;        
private static imqttclientoptions options = null;        
private static bool runstate = false;        
private static bool running = false;        
 /// <summary>        
/// 服务器ip        
/// </summary>        
private static string serverurl = "182.61.51.85";        
/// <summary>        
/// 服务器端口        
/// </summary>        
private static int port = 61613;        
/// <summary>        
/// 选项 - 开启登录 - 密码        
/// </summary>        
private static string password = "ruichi8888";        
/// <summary>        
/// 选项 - 开启登录 - 用户名        
/// </summary>        
private static string userid = "admin";        
/// <summary>        
/// 主题        
/// <para>china/hunan/yiyang/nanxian</para>        
/// <para>hotel/room01/tv</para>        
/// <para>hospital/dept01/room001/bed001</para>        
/// <para>hospital/#</para>        
/// </summary>        
private static string topic = "china/hunan/yiyang/nanxian";        
/// <summary>        
/// 保留        
/// </summary>        
private static bool retained = false;       
 /// <summary>       
 /// 服务质量        
/// <para>0 - 至多一次</para>        
/// <para>1 - 至少一次</para>        
/// <para>2 - 刚好一次</para>        
/// </summary>        
private static int qualityofservicelevel = 0;
}

先看一下start方法

public static void start()
        {
            try
            {
                runstate = true;
                thread thread = new thread(work);    //原帖中是这样写的 thread thread = new thread(new threadstart( work));
                thread.isbackground = true;
                thread.start();
            }
            catch (exception ex)
            {
                console.writeline( "启动客户端出现问题:" + ex.tostring());
            }
        }

没进入正题之前,先普及一下基本知识 

c#的threadstart 和 thread  多线程,new thread(t1);和new thread(new threadstart(t1));没有什么区别.前者是.net的写法,后者是c#的写法

 

具体请看下面的连接

进入整体,介绍连接方法 work

private static void work()
        {
            running = true;
            console.writeline("work >>begin");
            try
            {
                var factory = new mqttfactory();        //声明一个mqtt客户端的标准步骤 的第一步
                mqttclient = factory.createmqttclient() as mqttclient;  //factory.createmqttclient()实际是一个接口类型(imqttclient),这里是把他的类型变了一下
                options = new mqttclientoptionsbuilder()    //实例化一个mqttclientoptionsbulider
.withtcpserver(serverurl, port) .withcredentials(userid, password) .withclientid("xman") .build();                   mqttclient.connectasync(options);      //连接服务器
        
          //下面这些东西是什么,为什么要这么写,直到刚才我还是不懂,不过在github的网址我发现了出处. mqttclient.connectedhandler = new mqttclientconnectedhandlerdelegate(new func<mqttclientconnectedeventargs, task>(connected)); mqttclient.disconnectedhandler = new mqttclientdisconnectedhandlerdelegate(new func<mqttclientdisconnectedeventargs, task>(disconnected)); mqttclient.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(new action<mqttapplicationmessagereceivedeventargs>(mqttapplicationmessagereceived)); while (runstate) { thread.sleep(100); } } catch(exception exp) { console.writeline(exp); } console.writeline("work >>end"); running = false; runstate = false; }

 

先来看看mqttclient 类里面都有什么东西

C#的ThreadStart 和 Thread 

需要实现的接口,如何实现,说重点!

在github上有个地方进去看看就知道了‘

C#的ThreadStart 和 Thread 

 这个页面的最下方写着如何实现    https://github.com/chkr1011/mqttnet/wiki/upgrading-guide

private void something()
{
    mqttclient.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(onappmessage);
    mqttclient.connectedhandler = new mqttclientconnectedhandlerdelegate(onconnected);
    mqttclient.disconnectedhandler = new mqttclientdisconnectedhandlerdelegate(ondisconnected);
}

private async void onappmessage(mqttapplicationmessagereceivedeventargs e)
{
}

private async void onconnected(mqttclientconnectedeventargs e)
{
}

private async void ondisconnected(mqttclientdisconnectedeventargs e)
{
}

在开始connected方法之前有必要看一下关于同步和异步的知识,

现学现卖简单说一下:

task就是异步的调用,就在不影响主线程运行的另一个线程,但是他能像线程池一样更高效的利用现有的空闲线程

async必须用来修饰task ,void,或者task<tresult>, await是等待异步线程task.run()开始的后台线程执行完毕。

记住要是task 实现异步功能,必须用 async 修饰,且async 与await成对出现。

详见下面大神写的大作:

下面是什么意思?

mqttclient.connectedhandler = new mqttclientconnectedhandlerdelegate(new func<mqttclientconnectedeventargs, task>(connected));

 

mqttclientconnectedhandlerdelegate 这个实例实现了mqttclient.connectedhandler接口

new func<mqttclientconnectedeventargs, task>(connected) ,

使用func委托传入mqttclientconnectedeventargs类型的参数,返回的类型是task,task是一个类,这个类没有返回值,如果有返回值就是task<tresult>。

是委托就要带一个方法取实现,这个方法就是connected。

这句话的意思是,用mqttclientconnectedhandlerdelegate实现接口,同时使用委托取调用connected的方法,并且给这个方法传入一个mqttclientconnectedeventargs参数,

这个委托的返回值是task(就是不需要返回类型的异步调用),这也就定义了connected的类型必须是async task。

好了来看下 connected,这个函数什么意思

就是与服务器连接之后要干什么,订阅一个topic,或几个topic。连接之前已经连接了connectasync(),如果断线还会重连,后面会提到。

这个就连接之后需要做的事----订阅!

        private static async task connected(mqttclientconnectedeventargs e)
        {
            try
            {
                list<topicfilter> listtopic = new list<topicfilter>();
                if (listtopic.count() <= 0)
                {
                    var topicfilterbulder = new topicfilterbuilder().withtopic(topic).build();
                    listtopic.add(topicfilterbulder);
                    console.writeline("connected >>subscribe " + topic);
                }                await mqttclient.subscribeasync(listtopic.toarray());
                console.writeline("connected >>subscribe success");
            }
            catch (exception exp)
            {
                console.writeline(exp.message);
            }
        }

topicfilter是一个topic详细信息的类

C#的ThreadStart 和 Thread 

 

 掉线的发生时会执行这个函数

private static async task disconnected(mqttclientdisconnectedeventargs e)
        {
            try
            {
                console.writeline("disconnected >>disconnected server");
                await task.delay(timespan.fromseconds(5));
                try
                {
                    await mqttclient.connectasync(options);
                }
                catch (exception exp)
                {
                    console.writeline("disconnected >>exception " + exp.message);
                }
            }
            catch (exception exp)
            {
                console.writeline(exp.message);
            }
        }

越写问题越多,这个为什么断线的时候会执行这个方法,这不是事件,只是接口!

怎么实现的?看了一下源码,一时只看了大概,这些功能的绑定都是在connectasync的时候就完成了!

 

下面接收到消息的时候

        /// <summary>
        /// 接收消息触发事件
        /// </summary>
        /// <param name="e"></param>
        private static void mqttapplicationmessagereceived(mqttapplicationmessagereceivedeventargs e)
        {
            try
            {
                string text = encoding.utf8.getstring(e.applicationmessage.payload);
                string topic = e.applicationmessage.topic; string qos = e.applicationmessage.qualityofservicelevel.tostring();
                string retained = e.applicationmessage.retain.tostring();
                console.writeline("messagereceived >>topic:" + topic + "; qos: " + qos + "; retained: " + retained + ";");
                console.writeline("messagereceived >>msg: " + text);
            }
            catch (exception exp)
            {
                console.writeline(exp.message);
            }
        }

 

 最后就是发布:一般会选择0,如果选择其他的情况在订阅端不在的时候,服务器可能会崩溃

/// <summary>       
        /// /// 发布        
        /// <paramref name="qos"/>        
        /// <para>0 - 最多一次</para>        
        /// <para>1 - 至少一次</para>        
        /// <para>2 - 仅一次</para>        
        /// </summary>       
        /// <param name="topic">发布主题</param>        
        /// <param name="message">发布内容</param>        
        /// <returns></returns>        
        public static void publish( string topic,string message)
        {
            try
            {
                if (mqttclient == null)
                    return;
                if (mqttclient.isconnected == false)
                    mqttclient.connectasync(options);
                if (mqttclient.isconnected == false)
                {
                    console.writeline("publish >>connected failed! ");
                    return;
                }
                console.writeline("publish >>topic: " + topic + "; qos: " + qualityofservicelevel + "; retained: " + retained + ";");
                console.writeline("publish >>message: " + message);
                mqttapplicationmessagebuilder mamb = new mqttapplicationmessagebuilder()                 
                    .withtopic(topic)                 
                    .withpayload(message).withretainflag(retained);
                if (qualityofservicelevel == 0)
                {
                    mamb = mamb.withatmostonceqos();
                }
                else if (qualityofservicelevel == 1)
                {
                    mamb = mamb.withatleastonceqos();
                }
                else if (qualityofservicelevel == 2)
                {
                    mamb = mamb.withexactlyonceqos();
                }
                mqttclient.publishasync(mamb.build());
            }
            catch (exception exp)
            {
                console.writeline("publish >>" + exp.message);
            }
        }

 

 

 

 纸上得来终觉浅,要改造成自己想要的些东西,还要花些功夫!不过这已经很好了!谢谢各位高手的贡献