mqttnet消息推送与接收

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中............
@echo off
@title 安装windows服务:MqttNetServiceAddUserAndPassword
@sc create MqttNetServiceAddUserAndPassword binPath="%~dp0\MqttNetServiceAddUserAndPassword.exe"
@sc config MqttNetServiceAddUserAndPassword start= auto
@sc start MqttNetServiceAddUserAndPassword
@echo.MqttNetServiceAddUserAndPassword启动完毕
pause

//binPath="%~dp0\MqttNetServiceAddUserAndPassword.exe"   当前路径,也可指定



delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中..........
@echo off
@sc stop MqttNetServiceAddUserAndPassword
@sc delete MqttNetServiceAddUserAndPassword
@echo off
@echo.MqttNetServiceAddUserAndPassword卸载完毕
@pause


服务端:

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;


namespace MqttNetServiceAddUserAndPassword
{
    public partial class Service1 : ServiceBase
    {
        private readonly static object locker = new object();
        private MqttServer mqttServer = null;
        private System.Timers.Timer timer = null;


        private GodSharp.Sockets.SocketServer socketService = null;


 


        //此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零;
        private List<string> subClientIDs = new List<string>();
        public Service1()
        {
            InitializeComponent();
            //创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
            timer = new System.Timers.Timer();
            timer.AutoReset = true;
            timer.Enabled = true;
            timer.Interval = 5000;
            timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);




        }


        protected override void OnStart(string[] args)
        {
            //开启服务
            //CreateMQTTServer();


            Task.Run(CreateMQTTServer);




            if (timer.Enabled == false)
            {
                timer.Enabled = true;
                timer.Start();
            }
            //创建socket服务端
            //CreateServerSocket();
        //    SocketServer.StartSocketService();
        }


        protected override void OnStop()
        {
            if (timer.Enabled == true)
            {
                timer.Enabled = false;
                timer.Stop();
            }
        }
        /// <summary>
        /// 开启服务
        /// </summary>
        private async Task CreateMQTTServer()
        {
            if (mqttServer == null)
            {
                var optionsBuilder = new MqttServerOptionsBuilder();
                optionsBuilder.WithConnectionValidator(c =>
                {
                    if (c.ClientId.Length < 5 || !c.ClientId.StartsWith("Eohi_"))
                    {
                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
                        return;
                    }


                    if (c.Username != "user" || c.Password != "123456")
                    {
                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                        return;
                    }
                    c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
                });
                //指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。
                //options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(""))
                //指定端口
                optionsBuilder.WithDefaultEndpointPort(1884);
                //连接记录数,默认 一般为2000
                //optionsBuilder.WithConnectionBacklog(2000);
                mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
                string msg = null;
             
                //将发送的消息加到日志                      
                mqttServer.ApplicationMessageReceived += (s, e) =>
                {
                    msg = @"发送消息的客户端id:" + e.ClientId + "\r\n"
                  + "发送时间:" + DateTime.Now + "\r\n"
                  + "发送消息的主题:" + e.ApplicationMessage.Topic + "\r\n"
                 + "发送的消息内容:" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + "\r\n"
                 + "--------------------------------------------------\r\n"
                 ;
                    WriteMsgLog(msg);
                };
                await mqttServer.StartAsync(optionsBuilder.Build());


            }
        }
        #region 记录日志  
        /// <summary>  
        /// 消息记录日志  
        /// </summary>  
        /// <param name="msg"></param>  
        private void WriteMsgLog(string msg)
        {


            //string path = @"C:\log.txt";  


            //该日志文件会存在windows服务程序目录下  
            string path = AppDomain.CurrentDomain.BaseDirectory + "\\Msglog.txt";
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {
                FileStream fs;
                fs = File.Create(path);
                fs.Close();
            }
            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {
                using (StreamWriter sw = new StreamWriter(fs))
                {
                    sw.WriteLine(DateTime.Now.ToString() + "   " + msg);
                }
            }
        }
        private void PubMessage(string topic, string msg)
        {
            if (mqttServer != null)
            {
                lock (locker)
                {
                    var message = new MqttApplicationMessageBuilder();
                    message.WithTopic(topic);
                    message.WithPayload(msg);
                    mqttServer.PublishAsync(message.Build());
                }
            }
        }
        /// <summary>
        ///客户端链接日志           客户端接入
        /// </summary>
        /// <param name="msg"></param>
        private void WriteClientLinkLog(string msg)
        {
            //string path = @"C:\log.txt";  


            //该日志文件会存在windows服务程序目录下  
            string path = AppDomain.CurrentDomain.BaseDirectory + "\\ClientLinklog.txt";
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {
                FileStream fs;
                fs = File.Create(path);
                fs.Close();
            }


            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {
                using (StreamWriter sw = new StreamWriter(fs))
                {
                    sw.WriteLine(msg);
                }
            }
        }
        /// <summary>
        /// 通过定时器将客户端链接信息写入日志      
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)
        {
            // List<SetServiceM> dic = new List<SetServiceM>();   
            if (mqttServer != null)
            {
                List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();
                if (subclients.Count > 0)
                {
                    string subclientcount = @"客户端接入的总数为:" + (subclients.Count - 1).ToString() + "\r\n"
                        + "------------------------------------------------------- \r\n";
                    WriteClientLinkLog(subclientcount);
                    PubMessage("ClientsCount", (subclients.Count - 1).ToString());
                    List<string> clientids = new List<string>();
                    //连接客户端的个数
                    //   dic.Add(SetServiceM.SetService( "ClientCount", subclients.Count.ToString()));
                    //   var dicclientlink = new Dictionary<string, string>();


                    foreach (var item in subclients)
                    {
                        if (!subClientIDs.Contains(item.ClientId))
                        {
                            subClientIDs.Add(item.ClientId);
                            string msg = @"连接客户端ID:" + item.ClientId + "\r\n"
                            + "连接时间:" + DateTime.Now + "\r\n"
                            + "协议版本:" + item.ProtocolVersion + "\r\n"
                            + "最后收到的非保持活包:" + item.LastNonKeepAlivePacketReceived + "\r\n"
                            + "最后收到的包:" + item.LastPacketReceived + "\r\n"
                            + "挂起的应用程序消息:" + item.PendingApplicationMessages + "\r\n"
                            + "------------------------------------------------" + "\r\n";
                            WriteClientLinkLog(msg);
                            PubMessage("clientlink", msg);
                            //    mqttServer.PublishAsync("clientlink", msg);
                            //    dicclientlink.Add(item.ClientId, msg);
                        }
                        clientids.Add(item.ClientId);
                    }
                    if (subClientIDs.Count >= 2000)
                    {
                        subClientIDs.Clear();
                    }
                    var exceptlist = subClientIDs.Except(clientids).ToList();
                    //  var dicclientoutline = new Dictionary<string, string>();
                    if (exceptlist.Count > 0)
                    {


                        exceptlist.ForEach(u =>
                        {
                            string msgoutline = @"客户端下线ID:" + u + "\r\n"
                       + "客户端下线时间:" + DateTime.Now.ToString() + "\r\n"
                       + "------------------------------------------------------------ \r\n"
                       ;
                            WriteClientLinkLog(msgoutline);
                            subClientIDs.Remove(u);
                            PubMessage("clientlink", msgoutline);
                        //     mqttServer.PublishAsync("clientlink", msgoutline);
                        // dicclientoutline.Add("OutLineID_" + u, msgoutline);
                    });
                    }
                    ////连接客户端的id
                    //dic.Add(SetServiceM.SetService("clientlink", JsonConvert.SerializeObject(dicclientlink)));
                    ////客户端下线的时间
                    //dic.Add(SetServiceM.SetService("clientoutline", JsonConvert.SerializeObject(dicclientoutline)));
                    //SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic)));
                }
                else
                {
                    string subclientcount = @"暂无客户端接入!" + "\r\n"
                     + "-------------------------------------------------------- \r\n";
                    WriteClientLinkLog(subclientcount);
                }
            }
        }
        /// <summary>
        /// 客户端下线时间
        /// </summary>
        /// <param name="msg"></param>
        public void WriteClientOutLineLog(string msg)
        {
            string path = AppDomain.CurrentDomain.BaseDirectory + "\\ClientOutLineLog.txt";
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {
                FileStream fs = File.Create(path);
                fs.Close();
            }
            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {
                using (StreamWriter sw = new StreamWriter(fs))
                {
                    sw.WriteLine(msg);
                }
            }
        }
        //windows服务里的服务端
        private void CreateServerSocket()
        {
            if (socketService == null)
            {
                // IPEndPoint ipep = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
                socketService = new GodSharp.Sockets.SocketServer("127.0.0.1", 9001, ProtocolType.Tcp);  //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                socketService.Start();
                socketService.Listen(10);
                Thread thread = new Thread(new ThreadStart(new Action(() =>
                {
                    while (true)
                    {
                    //  socketClient = socketService.Clients[0];
                    // string data = "sql|" ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result}
                    // client.Send(Encoding.Default.GetBytes(msg));
                }
                })));
            }
            else
            {
                CreateServerSocket();
            }
        }


        #endregion
    }

}


服务端桌面显示程序:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;


namespace MQTTNetFrm
{
    public partial class Form1 : Form
    {
        private ServiceController ServiceController = null;
        private MqttClientOptions options = null;
        private MqttClient mqttClient = null;






        public Form1()
        {
            InitializeComponent();


   
            new Thread(new ThreadStart(GetServiceStatus)).Start();


          Task.Run(LinkClientService).Wait();
        }
        /// <summary>
        /// 获取当前ip地址
        /// </summary>
        /// <returns></returns>
        private  string GetLocalIP()
        {
            string ip = null;
          var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList();
            iplist.ForEach(u =>
            {
                if (u.AddressFamily == AddressFamily.InterNetwork)
                    ip= u.ToString();
            });
            return ip;
        }
        private async Task LinkClientService()
        {
            var m = "Eohi_Frm_" + Guid.NewGuid().ToString();
            options = new MqttClientOptions
            {
                ClientId = m,
                CleanSession = true,
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = GetLocalIP(),
                    Port = 1884,
                },
                Credentials = new MqttClientCredentials()
                {
                    Username = "user",
                    Password = "123456"
                }


            };
            var factory = new MqttFactory();
            mqttClient = factory.CreateMqttClient() as MqttClient;
            try
            {
                await mqttClient.ConnectAsync(options);
                but_submsg_Click();
                this.Invoke(new Action(() => { lab_serverstatus.Text = "连接正常,服务运行中............"; }));
            }
            catch (Exception ex)
            {


            }


        }
        private async void but_submsg_Click()
        {
            if (mqttClient != null)
            {
                await mqttClient.SubscribeAsync(new TopicFilter("ClientsCount", MqttQualityOfServiceLevel.AtMostOnce));
                await mqttClient.SubscribeAsync(new TopicFilter("clientlink", MqttQualityOfServiceLevel.AtMostOnce));
                await mqttClient.SubscribeAsync(new TopicFilter("msglog", MqttQualityOfServiceLevel.AtMostOnce));
                mqttClient.ApplicationMessageReceived += (s, e) =>
                {
                    this.Invoke(new Action(() =>
                    {
                        var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                        if (msg.Length<=5)
                        {
                            lab_clientcount.Text = msg;
                        }
                        if (msg.Length>10)
                        {
                            if (msg.StartsWith("连接")    )
                                rtb_clientlog.AppendText(msg);
                            rtb_msglog.AppendText(msg);
                        }
   


                    }));
                };


            }
        }
        private void GetServiceStatus()
        {
            ServiceController[] serviceControllers = ServiceController.GetServices();
            if (serviceControllers.Length > 0)
            {
                serviceControllers.ToList().ForEach(u =>
                {
                    if (u.DisplayName == "MqttNetServiceAddUserAndPassword")
                    {
                        if (ServiceController == null)
                        {
                            ServiceController = u;
                        }
                        if (u.Status == ServiceControllerStatus.Running)
                        {
                            lab_serverstatus.Text = "服务运行中............";
                        }
                        else
                        {
                            lab_serverstatus.Text = "服务已停止............";
                        }
                    }
                });
            }
        }
        private void button2_Click(object sender, EventArgs e)
        {
            if (tabControl1.SelectedTab == tabPage1)
            {
                rtb_clientlog.Text = "";
            }
            else
            {
                rtb_msglog.Text = "";
            }
        }


        private void Form1_Load(object sender, EventArgs e)
        {


        }
    }

}


客户端:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Windows.Forms;


namespace MqttClientTest01
{


    public partial class Form1 : Form
    {
        private MqttClient mqttClient = null;
        private System.Timers.Timer timer = null;
        private int CountLink = 0;
        private MqttClientOptions options = null;
        public Form1()
        {
            InitializeComponent();
            ////创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
            //timer = new System.Timers.Timer();
            //timer.AutoReset = true;
            //timer.Interval = 1000;
            //timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService);
        }


        private void LinkMqttNetService(object sender, ElapsedEventArgs e)
        {
            if (mqttClient == null)
            {
                //   RunAsync();
                CountLink++;
            }
            if (CountLink >= 5)
            {
                MessageBox.Show("连接多次失败,请确认各参数是否正确!");
                CountLink = 0;
                timer.Enabled = false;
            }
        }
        private void but_linkserver_Click(object sender, EventArgs k)
        {
            LinkClientService();
            //CountLink = 0;
            //timer.Enabled = true;
            //timer.Start();
        }
        /// <summary>
        /// 链接客户端
        /// </summary>
        public async  void LinkClientService()
        {
            var m = "Eohi_" + Guid.NewGuid().ToString();
            options = new MqttClientOptions
            {
                ClientId = m,
                CleanSession = true,
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = txtb_serverip.Text.Trim(),
                    Port = Convert.ToInt32(txtb_serverport.Text.Trim()),
                },
                Credentials = new MqttClientCredentials()
                {
                    Username = tb_username.Text,
                    Password = tb_userpwd.Text
                }


            };
            var factory = new MqttFactory();
            mqttClient =  factory.CreateMqttClient() as MqttClient;
            try
            {
                await mqttClient.ConnectAsync(options);
                this.Invoke(new Action(() =>
                {
                    lab_linkstatus.Text = "连接成功!";
                    lab_linktimer.Text = DateTime.Now.ToString();
                }));
                mqttClient.Disconnected += async (s, e) =>
                {
                    if (e.ClientWasConnected==false)
                    {
                        try
                        {
                            await mqttClient.ConnectAsync(options);
                            this.Invoke(new Action(() =>
                            {
                                lab_linkstatus.Text = "连接成功!";
                                lab_linktimer.Text = DateTime.Now.ToString();
                            }));
                        }
                        catch (Exception ex)
                        {
                            lab_linkstatus.Text = "连接失败!"+ex.Message;
                            lab_linktimer.Text = DateTime.Now.ToString();
                        }


                    }
                };
            }
            catch (Exception ex)
            {
                lab_linkstatus.Text = "连接失败!请检查ip/端口" ;
                lab_linktimer.Text = DateTime.Now.ToString();
            }
 
        }


        private void tb_username_TextChanged(object sender, EventArgs e)
        {


        }


        private void but_clientsend_Click(object sender, EventArgs e)
        {
            if (mqttClient != null)
            {
                var message = new MqttApplicationMessageBuilder();
                message.WithTopic(txtb_msgtopic.Text.Trim());
                message.WithPayload(rtb_pubmsg.Text.Trim());
                message.WithExactlyOnceQoS();
                message.WithRetainFlag();
                mqttClient.PublishAsync(message.Build());
            }
        }
        private async void but_submsg_Click(object sender, EventArgs k)
        {
            if (mqttClient != null)
            {
                await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));
                mqttClient.ApplicationMessageReceived += (s, e) =>
                {
                    this.Invoke(new Action(() =>
                    {
                        rtb_submsgclient.AppendText("ClientID=" + e.ClientId + "\n");
                        rtb_submsgclient.AppendText($"+ Topic = {e.ApplicationMessage.Topic}" + "\n");
                        rtb_submsgclient.AppendText($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload) + "\n"}");
                        rtb_submsgclient.AppendText($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}" + "\n");
                        rtb_submsgclient.AppendText($"+ Retain = {e.ApplicationMessage.Retain}" + "\n");


                    }));


                };


            }
        }


        private void button1_Click(object sender, EventArgs e)
        {
            rtb_submsgclient.Text = "";
        }
    }
}