异步和事件驱动是Netty设计的关键。html
Channel:一个链接就是一个Channel。
Channel是Socket的封装,提供绑定,读,写等操做,下降了直接使用Socket的复杂性。Channel是Socket的抽象,能够被注册到一个EventLoop上,EventLoop至关于Selector,每个EventLoop又有本身的处理线程。
json
回调:通知的基础。bootstrap
EventLoop
咱们以前就讲过EventLoop这里回顾一下:服务器
一个 EventLoopGroup 包含一个或者多个 EventLoop;
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
全部由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
一个 Channel 在它的生命周期内只注册于一个 EventLoop;
一个 EventLoop 可能会被分配给一个或多个 Channel。app
ChannelHandler是处理数据的逻辑容器框架
ChannelInboundHandler是接收并处理入站事件的逻辑容器,能够处理入站数据以及给客户端以回复。异步
ChannelPipeline是将ChannelHandler穿成一串的的容器。async
编码器和解码器都实现了ChannelInboundHandler和 ChannelOutboundHandler接口用于处理入站或出站数据。ide
字节级操做,工控协议的话大多都是字节流,咱们之前的方式就是拼,大概就是:对照协议这两个字节是什么,后四个字节表示什么意思。如今DotNetty提供了一个ByteBuffer来简化咱们对于字节流的操做。适用工控协议。json格式的,可是底层仍是字节流。oop
DotNetty由九个项目构成,在NuGet中都是单独的包,能够按需引用,其中比较重要的几个是如下几个:
配置成服务器管道,TcpServerSocketChannel,之因此配置成服务器管道缘由是与客户端管道不一样,服务器管道多了侦听服务。将服务端的逻辑处理代码Handler以pipeline形式添加到channel中去。
using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Threading.Tasks; namespace EchoServer { class Program { static async Task RunServerAsync() { IEventLoopGroup eventLoop; eventLoop = new MultithreadEventLoopGroup(); try { // 服务器引导程序 var bootstrap = new ServerBootstrap(); bootstrap.Group(eventLoop); bootstrap.Channel<TcpServerSocketChannel>() // 保持长链接 .ChildOption(ChannelOption.SoKeepalive, true); bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new EchoServerHandler()); })); IChannel boundChannel = await bootstrap.BindAsync(3000); Console.ReadLine(); await boundChannel.CloseAsync(); } catch (Exception ex) { Console.WriteLine(ex); } finally { await eventLoop.ShutdownGracefullyAsync(); } } static void Main(string[] args) => RunServerAsync().Wait(); } }
接收连入服务端代码的客户端消息,并将此消息从新返回给客户端。
using DotNetty.Buffers; using DotNetty.Transport.Channels; using System; using System.Text; namespace EchoServer { /// <summary> /// 由于服务器只须要响应传入的消息,因此只须要实现ChannelHandlerAdapter就能够了 /// </summary> public class EchoServerHandler : ChannelHandlerAdapter { /// <summary> /// 每一个传入消息都会调用 /// 处理传入的消息须要复写这个方法 /// </summary> /// <param name="ctx"></param> /// <param name="msg"></param> public override void ChannelRead(IChannelHandlerContext ctx, object msg) { IByteBuffer message = msg as IByteBuffer; Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8)); ctx.WriteAsync(message); } /// <summary> /// 批量读取中的最后一条消息已经读取完成 /// </summary> /// <param name="context"></param> public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } /// <summary> /// 发生异常 /// </summary> /// <param name="context"></param> /// <param name="exception"></param> public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine(exception); context.CloseAsync(); } } }
配置须要链接的服务端ip地址及其端口号,而且配置客户端的处理逻辑代码以pipeline形式添加到channel中去。
using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Net; using System.Threading.Tasks; namespace EchoClient { class Program { static async Task RunClientAsync() { var group = new MultithreadEventLoopGroup(); try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel<TcpSocketChannel>() .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("192.168.1.11"), 3000)); Console.ReadLine(); await clientChannel.CloseAsync(); } catch (Exception ex) { Console.WriteLine(ex); } finally { await group.ShutdownGracefullyAsync(); } } static void Main(string[] args) => RunClientAsync().Wait(); } }
客户端链接成功后,向服务端发送消息,接受到服务端消息,对消息计数再发还给服务端。
using DotNetty.Buffers; using DotNetty.Transport.Channels; using System; using System.Collections.Generic; using System.Text; namespace EchoClient { public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer> { public static int i=0; /// <summary> /// Read0是DotNetty特有的对于Read方法的封装 /// 封装实现了: /// 1. 返回的message的泛型实现 /// 2. 丢弃非该指定泛型的信息 /// </summary> /// <param name="ctx"></param> /// <param name="msg"></param> protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg) { if (msg != null) { i++; Console.WriteLine($"Receive From Server {i}:" + msg.ToString(Encoding.UTF8)); } ctx.WriteAsync(Unpooled.CopiedBuffer(msg)); } public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } public override void ChannelActive(IChannelHandlerContext context) { Console.WriteLine($"发送客户端消息"); context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes($"客户端消息!"))); } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine(exception); context.CloseAsync(); } } }
备注:红框表示接收到消息的序号。