段时间在使用MQTTnet,都说这个东西比较好,但是翻了翻网上没有例子给参考一下。html
今天算是找到了,给高手的帖子作个宣传吧.git
原网址以下:https://blog.csdn.net/chenlu5201314/article/details/94740765github
因为GitHub上介绍的东西比较少,以个人水平真是不知道怎么用,先照葫芦画瓢,再看看怎么回事吧:服务器
功能:多线程
把订阅与发布作成一个类,还带有自动重连的功能异步
using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; //客户端须要用到 using MQTTnet.Client.Options; //具体链接时须要用到的属性,ID的名称,要链接Server的名称,接入时用到的帐号和密码,掉线时是否从新清除原有名称,还有许多... using MQTTnet.Packets; //这个没用上 using MQTTnet.Protocol; //这个也没用上 using MQTTnet.Client.Receiving; //接收 using MQTTnet.Client.Disconnecting; //断线 using MQTTnet.Client.Connecting; //链接
新建一个类:先写一下变量和一些字段async
class HOSMQTT { private static MqttClient mqttClient = null; private static IMqttClientOptions options = null; private static bool runState = false; private static bool running = false; /// <summary> /// 服务器IP /// </summary> private static string ServerUrl = "182.61.51.85"; /// <summary> /// 服务器端口 /// </summary> private static int Port = 61613; /// <summary> /// 选项 - 开启登陆 - 密码 /// </summary> private static string Password = "ruichi8888"; /// <summary> /// 选项 - 开启登陆 - 用户名 /// </summary> private static string UserId = "admin"; /// <summary> /// 主题 /// <para>China/Hunan/Yiyang/Nanxian</para> /// <para>Hotel/Room01/Tv</para> /// <para>Hospital/Dept01/Room001/Bed001</para> /// <para>Hospital/#</para> /// </summary> private static string Topic = "China/Hunan/Yiyang/Nanxian"; /// <summary> /// 保留 /// </summary> private static bool Retained = false; /// <summary> /// 服务质量 /// <para>0 - 至多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 恰好一次</para> /// </summary> private static int QualityOfServiceLevel = 0; }
先看一下Start方法ide
public static void Start() { try { runState = true; Thread thread = new Thread(Work); //原帖中是这样写的 Thread thread = new Thread(new ThreadStart( Work)); thread.IsBackground = true; thread.Start(); } catch (Exception ex) { Console.WriteLine( "启动客户端出现问题:" + ex.ToString()); } }
没进入正题以前,先普及一下基本知识 函数
具体请看下面的链接post
http://www.javashuo.com/article/p-avsvybev-dt.html
进入总体,介绍链接方法 Work
private static void Work() { running = true; Console.WriteLine("Work >>Begin"); try { var factory = new MqttFactory(); //声明一个MQTT客户端的标准步骤 的第一步 mqttClient = factory.CreateMqttClient() as MqttClient; //factory.CreateMqttClient()实际是一个接口类型(IMqttClient),这里是把他的类型变了一下 options = new MqttClientOptionsBuilder() //实例化一个MqttClientOptionsBulider
.WithTcpServer(ServerUrl, Port) .WithCredentials(UserId, Password) .WithClientId("XMan") .Build(); mqttClient.ConnectAsync(options); //链接服务器
//下面这些东西是什么,为何要这么写,直到刚才我仍是不懂,不过在GitHub的网址我发现了出处. mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected)); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected)); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); while (runState) { Thread.Sleep(100); } } catch(Exception exp) { Console.WriteLine(exp); } Console.WriteLine("Work >>End"); running = false; runState = false; }
先来看看MqttClient 类里面都有什么东西
须要实现的接口,如何实现,说重点!
在GitHub上有个地方进去看看就知道了‘
这个页面的最下方写着如何实现 https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide
private void Something() { mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage); mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); } private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e) { } private async void OnConnected(MqttClientConnectedEventArgs e) { } private async void OnDisconnected(MqttClientDisconnectedEventArgs e) { }
在开始Connected方法以前有必要看一下关于同步和异步的知识,
现学现卖简单说一下:
Task就是异步的调用,就在不影响主线程运行的另外一个线程,可是他能像线程池同样更高效的利用现有的空闲线程
async必须用来修饰Task ,void,或者Task<TResult>, await是等待异步线程Task.Run()开始的后台线程执行完毕。
记住要是Task 实现异步功能,必须用 async 修饰,且async 与await成对出现。
详见下面大神写的大做:http://www.javashuo.com/article/p-pjqezvdb-ke.html
下面是什么意思?
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));
MqttClientConnectedHandlerDelegate 这个实例实现了mqttClient.ConnectedHandler接口
new Func<MqttClientConnectedEventArgs, Task>(Connected) ,
使用Func委托传入MqttClientConnectedEventArgs类型的参数,返回的类型是Task,Task是一个类,这个类没有返回值,若是有返回值就是Task<TResult>。
是委托就要带一个方法取实现,这个方法就是Connected。
这句话的意思是,用MqttClientConnectedHandlerDelegate实现接口,同时使用委托取调用Connected的方法,而且给这个方法传入一个MqttClientConnectedEventArgs参数,
这个委托的返回值是Task(就是不须要返回类型的异步调用),这也就定义了Connected的类型必须是async Task。
好了来看下 Connected,这个函数什么意思
就是与服务器链接以后要干什么,订阅一个Topic,或几个Topic。链接以前已经链接了Connectasync(),若是断线还会重连,后面会提到。
这个就链接以后须要作的事----订阅!
private static async Task Connected(MqttClientConnectedEventArgs e) { try { List<TopicFilter> listTopic = new List<TopicFilter>(); if (listTopic.Count() <= 0) { var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build(); listTopic.Add(topicFilterBulder); Console.WriteLine("Connected >>Subscribe " + Topic); } await mqttClient.SubscribeAsync(listTopic.ToArray()); Console.WriteLine("Connected >>Subscribe Success"); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
TopicFilter是一个Topic详细信息的类
掉线的发生时会执行这个函数
private static async Task Disconnected(MqttClientDisconnectedEventArgs e) { try { Console.WriteLine("Disconnected >>Disconnected Server"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch (Exception exp) { Console.WriteLine("Disconnected >>Exception " + exp.Message); } } catch (Exception exp) { Console.WriteLine(exp.Message); } }
越写问题越多,这个为何断线的时候会执行这个方法,这不是事件,只是接口!
怎么实现的?看了一下源码,一时只看了大概,这些功能的绑定都是在ConnectAsync的时候就完成了!
下面接收到消息的时候
/// <summary> /// 接收消息触发事件 /// </summary> /// <param name="e"></param> private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { try { string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString(); string Retained = e.ApplicationMessage.Retain.ToString(); Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";"); Console.WriteLine("MessageReceived >>Msg: " + text); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
最后就是发布:通常会选择0,若是选择其余的状况在订阅端不在的时候,服务器可能会崩溃
/// <summary> /// /// 发布 /// <paramref name="QoS"/> /// <para>0 - 最多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 仅一次</para> /// </summary> /// <param name="Topic">发布主题</param> /// <param name="Message">发布内容</param> /// <returns></returns> public static void Publish( string Topic,string Message) { try { if (mqttClient == null) return; if (mqttClient.IsConnected == false) mqttClient.ConnectAsync(options); if (mqttClient.IsConnected == false) { Console.WriteLine("Publish >>Connected Failed! "); return; } Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";"); Console.WriteLine("Publish >>Message: " + Message); MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder() .WithTopic(Topic) .WithPayload(Message).WithRetainFlag(Retained); if (QualityOfServiceLevel == 0) { mamb = mamb.WithAtMostOnceQoS(); } else if (QualityOfServiceLevel == 1) { mamb = mamb.WithAtLeastOnceQoS(); } else if (QualityOfServiceLevel == 2) { mamb = mamb.WithExactlyOnceQoS(); } mqttClient.PublishAsync(mamb.Build()); } catch (Exception exp) { Console.WriteLine("Publish >>" + exp.Message); } }
纸上得来终觉浅,要改形成本身想要的些东西,还要花些功夫!不过这已经很好了!谢谢各位高手的贡献