NetCore下搭建websocket集群方案

介绍

最近在作一个基于netcore的实时消息服务。最初选用的是ASP.NET Core SignalR,可是后来发现目前它并无支持IOS的客户端,因此本身只好又基于websocket从新搭建了一套服务。html

由于前期已经使用了SignalR,因此我直接在本来的项目里面从新扩展了一套自定义websocket服务。前端

在网上有一篇博文介绍了如何在Asp.net Core中使用中间件来管理websocket,个人大部分代码也是参考这篇文章。在这儿贴个连接web

在Asp.net Core中使用中间件来管理websocketredis

自定义WebSocket 中间件

要阅读ASP.NET Core中的WebSockets支持,能够在此处查看。若是你的项目跟我同样,已经使用了Signalr,那么你不须要在安装Microsoft.AspNetCore.WebSockets包,不然在项目开始前,json

须要安装此Nuget包。如今你能够自定义你本身的中间件了。后端

/// <summary>
    /// websocket 协议扩展中间件 /// </summary>
    public class CustomWebSocketMiddlewarr { private readonly RequestDelegate _next; public CustomWebSocketMiddlewarr(RequestDelegate next) { _next = next; } public async Task Invoke(HttpContext context, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) { if (context.WebSockets.IsWebSocketRequest) { string ConId = context.Request.Query["sign"]; if (!string.IsNullOrEmpty(ConId)) { WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(); CustomWebSocket userWebSocket = new CustomWebSocket() { WebSocket = webSocket, ConId = ConId }; wsFactory.Add(userWebSocket); //await wsmHandler.SendInitialMessages(userWebSocket);
                    await Listen(context, userWebSocket, wsFactory, wsmHandler); } } else { context.Response.StatusCode = 400; } await _next(context); }      //监听客户端发送过来的消息 private async Task Listen(HttpContext context, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) { WebSocket webSocket = userWebSocket.WebSocket; var buffer = new byte[1024 * 4]; WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); while (!result.CloseStatus.HasValue) { await wsmHandler.HandleMessage(result, buffer, userWebSocket, wsFactory); buffer = new byte[1024 * 4]; result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } wsFactory.Remove(userWebSocket.ConId); await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); } }

在自定义的中间件中,首先判断是不是websocket请求,若是是的话,在查看是否有对应的sign标识,知足条件后进入后续的处理环节。缓存

简单讲解一下这里面的处理逻辑。由于个人项目中同时存在Signalr,而Signalr也会使用到websocket协议。可是Signalr的websocket请求传入的参数是id,因此我在这儿自定义了一个参数sign为了和Signalrwebsocket

作区分。那么这个sign是作什么用的呢? 其实sign是前端传过来的惟一标识,和这次链接对应,也能够理解为Signalr里面的connectionId。而后会把标识和对应websocket类到存入到一个list集合中。即代码app

中的  wsFactory.Add(userWebSocket)。框架

CustomWebSocket是一个包含WebSocket和标识的类:

public  class CustomWebSocket { public string ConId { get; set; } public WebSocket WebSocket { get; set; } }

而后定义了一个Websocket工厂类,用来存取链接到服务的Websocket实例。

//接口
public
interface ICustomWebSocketFactory { void Add(CustomWebSocket uws); void Remove(string conId); List<CustomWebSocket> All(); List<CustomWebSocket> Others(CustomWebSocket client); CustomWebSocket Client(string conId); }
  

具体实现

public class CustomWebSocketFactory: ICustomWebSocketFactory { List<CustomWebSocket> List; public CustomWebSocketFactory() { List = new List<CustomWebSocket>(); } public void Add(CustomWebSocket uws) { List.Add(uws); } public void Remove(string conId) { List.Remove(Client(conId)); } public List<CustomWebSocket> All() { return List; } public List<CustomWebSocket> Others(CustomWebSocket client) { return List.Where(c => c.ConId != client.ConId).ToList(); } public CustomWebSocket Client(string conId) { var uws= List.FirstOrDefault(c => c.ConId == conId); return uws; } }

能够看到最终咱们存取websocket都是经过list来进行,因此在注入的时候必定要注意。注入成单例模式。

services.AddSingleton<ICustomWebSocketFactory, CustomWebSocketFactory>();

CustomWebSocketMessageHandle包含有关消息处理的逻辑(发送,接收)
public interface ICustomWebSocketMessageHandler { Task SendInitialMessages(CustomWebSocket userWebSocket); Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory); Task SendMessageInfo(string conId, object data, ICustomWebSocketFactory wsFactory); } public   class CustomWebSocketMessageHandler:ICustomWebSocketMessageHandler { public async Task SendInitialMessages(CustomWebSocket userWebSocket) { WebSocket webSocket = userWebSocket.WebSocket; var msg = new CustomWebSocketMessage { MessagDateTime = DateTime.Now, Type = WSMessageType.链接响应 }; string serialisedMessage = JsonConvert.SerializeObject(msg); byte[] bytes = Encoding.ASCII.GetBytes(serialisedMessage); await webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, CancellationToken.None); } /// <summary>
        /// 推送消息到客户端 /// </summary>
        /// <returns></returns>
        public async Task SendMessageInfo(string conId,object data, ICustomWebSocketFactory wsFactory) { var uws = wsFactory.Client(conId); CustomWebSocketMessage message = new CustomWebSocketMessage(); message.DataInfo = data; message.Type = WSMessageType.任务数量; message.MessagDateTime = DateTime.Now; if (uws == null) { //广播到其余集群节点
                var listpush = new List<PushMsg>(); var push = new PushMsg() { sendjsonMsg = new WebSocketFanoutDto() { conId = conId, data = message }, exchangeName = "saas.reltimewsmes.exchange", sendEnum = SendEnum.订阅模式 }; listpush.Add(push); BTRabbitMQManage.PushMessageAsync(listpush); return; } var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(message)); await uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); } /// <summary>
        /// 处理接收到的客户端信息 /// </summary>
        /// <param name="result"></param>
        /// <param name="buffer"></param>
        /// <param name="userWebSocket"></param>
        /// <param name="wsFactory"></param>
        /// <returns></returns>
        public async Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) { string msg = Encoding.UTF8.GetString(buffer); try { var message = JsonConvert.DeserializeObject<CustomWebSocketMessage>(msg); if (message.Type == WSMessageType.用户信息) { var logdto = JsonConvert.DeserializeObject<LoginInfoDto>(message.DataInfo.ToJsonString()); await InitUserInfo(logdto, userWebSocket, wsFactory); } } catch (Exception e) { var exbuffer = Encoding.UTF8.GetBytes(e.Message); var excount = Encoding.UTF8.GetByteCount(e.Message); await userWebSocket.WebSocket.SendAsync(new ArraySegment<byte>(exbuffer, 0, excount), result.MessageType, result.EndOfMessage, CancellationToken.None); } } /// <summary>
        /// 初始化用户链接关系 /// </summary>
        /// <param name="dto"></param>
        /// <param name="userWebSocket"></param>
        /// <param name="wsFactory"></param>
        /// <returns></returns>
        private async Task InitUserInfo(LoginInfoDto dto, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) { if (dto.userId == 0) return; var contectid = userWebSocket.ConId; var key = ""; if (dto.tenantId.HasValue) key += "T_" + dto.userId + "_" + dto.tenantId + "_" + "tenant_"; if (dto.bankId.HasValue) key += "B_" + dto.userId + "_" + dto.bankId + "_" + "bank_"; key += dto.fromeType; //添加缓存
            CacheInstace<string>.GetRedisInstanceDefaultMemery().AddOrUpdate(key, contectid, r => { r = contectid; return r; }); CacheInstace<string>.GetRedisInstanceDefaultMemery().Expire(key, new TimeSpan(12, 0, 0));  } }
在这里面,推送消息到客户端的时候,若是未找到标识对应的Websocket对象,则将消息广播到全部的集群节点上。咱们知道Signalr里面的集群实现经过redis来作的,但在此处,由于
我项目里面已经搭建了Rabbitmq的高可用集群,因此我直接经过Rabbitmq来进行广播。这样无论我是在集群的那个节点上来推送消息,均可以保证消息被正确推送到客户端。
关于广播消息的订阅实现:
public class WebSocketFanoutDto { public string conId { get; set; } public CustomWebSocketMessage data { get; set; } } public class FanoutMesConsume : IMessageConsume { public void Consume(string message) { var condto = JsonConvert.DeserializeObject<WebSocketFanoutDto>(message); var wsFactory = IOCManage.ServiceProvider.GetService<ICustomWebSocketFactory>(); var uws = wsFactory.Client(condto.conId); if (uws != null) { //发送消息
                var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(condto.data)); var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(condto.data)); uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); } } }
 

最后在扩展类里面添加消息监视和注入Websocket中间件。

固然不要忘记 消息处理类的依赖注入

services.AddSingleton<ICustomWebSocketMessageHandler, CustomWebSocketMessageHandler>();
 
public static IApplicationBuilder UseCustomWebSocketManager(this IApplicationBuilder app) { //添加针对分布式集群的消息监视
            RabbitMQManage.Subscribe<FanoutMesConsume>(new MesArgs() { exchangeName = "reltimewsmes.exchange", sendEnum = SendEnum.订阅模式 }); return app.UseMiddleware<CustomWebSocketMiddlewarr>(); }

至此这个框架搭建完成,最后在startup类中注入。

关于Rabbitmq的使用,发送和接收是我基于easynetq封装的一个帮助类,你们能够自行实现。

这里面最主要的逻辑就是每个websocket实例都有一个对应的标识,而后在链接成功后,前端会发送用户信息,后端服务再把用户信息和链接标识关联。这样若是想推送信息到某个用户的话,就能够经过

用户信息来找到用户对应的链接信息。至于为何整个流程会这么复杂的,就一言难尽(我能怎么办,我也很绝望啊)。大多数时候你们均可以直接经过token认证来绑定用户和socket链接。

目前还有几个问题一个广播消息的时候,发送消息方也会收到这个消息,这挺尴尬,目前我还没想到太好的解决办法。

第二个是采用单例list字段存储链接的websocket实例,少的时候还好,若是多的话,感受可能会存在堆栈溢出的问题,但没实际测试过,因此目前还不知道最大的链接数多少。

 

原文出处:https://www.cnblogs.com/dandan123/p/10059026.html

相关文章
相关标签/搜索