[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。 当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.0-rc5。本文档是对4.0.0-rc5分支代码进行分析。
对NetMQ的源码进行学习并分析理解,所以写下该系列文章,本系列文章暂定编写计划以下:github
友情提示: 看本系列文章时最好获取源码,更有助于理解。编程
NetMQ 4.0.0底层使用的是IOCP(即完成端口)模式进行通讯的(3.3.4使用的是select模型),经过异步IO绑定到完成端口,来最大限度的提升性能。这里不对同步/异步socket进行详细介绍。稍微解释下完成端口,为了解决每一个socket客户端使用一个线程进行通讯的性能问题,完成端口它充分利用内核对象的调度,只使用少许的几个线程来处理和客户端的全部通讯,消除了无谓的线程上下文切换,最大限度的提升了网络通讯的性能。
想详细了解完成端口的请看完成端口(Completion Port)详解,讲解的比较详细,同时对各类网络编程模型作了简单的介绍。
所以NetMQ经过几个(默认1个)IO线程处理通讯,上一片文章介绍了ZObejct对象,在该对象中存在许多命令的处理,实际对命令的发送,分配都是IO线程的工做。数组
IO线程初始化时会初始化Proactor
和IOThreadMailbox
网络
var name = "iothread-" + threadId; m_proactor = new Proactor(name); m_mailbox = new IOThreadMailbox(name, m_proactor, this);
Proactor
对象就是用来绑定或处理完成端口用的,后面再作做详细介绍。
IOThreadMailbox
是IO线程处理的信箱,每当有命令须要处理时,都会向当前Socket
对象所在的IO线程信箱发送命令。
让咱们看一眼IOThread
对象和IOThreadMailbox
的定义负载均衡
internal sealed class IOThread : ZObject, IMailboxEvent { }
IOThread
对象继承自ZObject
对象,记得上一节想到ZObject对象知道如何处理各类命令吗?所以IOThread
对象也继承了他父亲的技能。同时IOThread
对象实现了IMailboxEvent
接口,这个接口之定义了一个方法。异步
internal interface IMailboxEvent { void Ready(); }
当IO信箱接受到命令时表示当前有命令准备好了,能够进行 处理,IO信箱则会调用IO线程的Ready方法处理命令,那么IO信息如何调用IO线程的Ready方法呢,来看下IOThreadMailbox
的构造函数。socket
internal class IOThreadMailbox : IMailbox { ... public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent) { m_proactor = proactor; m_mailboxEvent = mailboxEvent; Command cmd; bool ok = m_commandPipe.TryRead(out cmd); } ... }
在IOThreadMailbox初始化时,传入了IMailboxEvent。async
m_commandPipe是NetMQ的管道(Pipe),后面咱们会对其作介绍,这里只要知道该管道用于存放命令便可,能够__暂时__理解为管道队列。
每一个IOThread
会有一个Proactor
,Proactor
的工做就是将Socket
对象绑定到完成端口,而后定时去扫描完成端口是否有须要处理的Socket
对象。
internal class Proactor : PollerBase { ... public Proactor([NotNull] string name) { m_name = name; m_stopping = false; m_stopped = false; m_completionPort = CompletionPort.Create(); m_sockets = new Dictionary<AsyncSocket, Item>(); } ... }
Proactor
对象继承自PollerBase
,那么PollerBase
又是什么呢?从命名能够看这是一个轮询基类,即该对象须要长时间不断循环处理某件事情。
PollerBase
对象是一个抽象类,它有2个功能:
负载均衡
还记的Context中选择IO线程时有这个一段代码吗?
IO线程的负载均衡功能就是PollBase对象提供的
m_load
字段值+1protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
IOThread
取PollBase对象(Proactor)的Load属性时候会特殊处理,保证拿到的是最新的值。定时任务
PollBase第二个功能就是支持定时任务,即定时触发某事件。
private readonly SortedList<long, List<TimerInfo>> m_timers;
PollBase内部有一个SortedList
,key为任务执行的时间,value为TimeInfo
。
TimeInfo
对象包含2个信息,id
和ITimerEvent
接口,id
用来辨别当前任务的类型,ITimerEvent
接口就包含了TimerEvent
方法,即如何执行。
如TcpConnection
链接失败会从新链接时会重连,下面时TcpConnection
开始链接方法
private void StartConnecting() { Debug.Assert(m_s == null); // Create the socket. try { m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); } catch (SocketException) { AddReconnectTimer(); return; } ... } private void AddReconnectTimer() { //获取重连时间间隔 int rcIvl = GetNewReconnectIvl(); //IO线程的Proactor中,TcpConnection的ReconnectTimerId = 1 m_ioObject.AddTimer(rcIvl, ReconnectTimerId); ... }
IO线程会被封装到IOObject
中,调用IOObject
的AddTimer
方法实际就是调用IO线程中Proactor
对象的AddTimer
方法,其方法定义以下
public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id) { long expiration = Clock.NowMs() + timeout; var info = new TimerInfo(sink, id); if (!m_timers.ContainsKey(expiration)) m_timers.Add(expiration, new List<TimerInfo>()); m_timers[expiration].Add(info); }
第一行会获取当前的毫秒时间加上时间间隔。而后加入到m_timers
中。
m_completionPort = CompletionPort.Create(); m_sockets = new Dictionary<AsyncSocket, Item>();
初始化时会建立完成端口,当有socket须要处理时会和完成端口绑定。
初始化时还会初始化一个存放异步AsyncSocket
和item
的字典。
有关于AsyncSocket
和CompletionPort
能够去Git上看AsyncIO的源码,这里不作分析。
Item
结构以下
private class Item { public Item([NotNull] IProactorEvents proactorEvents) { ProactorEvents = proactorEvents; Cancelled = false; } [NotNull] public IProactorEvents ProactorEvents { get; } public bool Cancelled { get; set; } }
它包含了IProactorEvents
接口的信息和当前Socket
操做是否被取消标志。
internal interface IProactorEvents : ITimerEvent { void InCompleted(SocketError socketError, int bytesTransferred); void OutCompleted(SocketError socketError, int bytesTransferred); }
IProactorEvents
继承自ITimerEvent
。同时它还声明了InCompleted
和OutCompleted
方法,即发送或接收完成时如何处理,所以当须要处理Socket
时,会将当前Socket
处理方式保存到这个字典中。当当前对象发送消息完成,则会调用OutCompleted
方法,接收完成时则会调用InCompleted
方法。
当有Socket
须要绑定时会调用Proactor
的AddSocket
方法
public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents) { var item = new Item(proactorEvents); m_sockets.Add(socket, item); m_completionPort.AssociateSocket(socket, item); AdjustLoad(1); }
它包含2个参数,一个时异步Socket
对象和IProactorEvents
。而后加把他们加入到字段中并将他们绑定到完成端口上。第四段AdjustLoad
方法即把当前IO线程处理数量+1,用于负载均衡用。
当Socket
操做完成时会调用Proactor
的RemoveSocket
移除绑定
public void RemoveSocket(AsyncSocket socket) { AdjustLoad(-1); var item = m_sockets[socket]; m_sockets.Remove(socket); item.Cancelled = true; }
移除时会将item
的Cancelled
字段设置为true
。因此当Proactor
轮询处理Socket
时发现该Socket
操做被取消(移除),就会跳过处理。
在IO线程启动时实际就是启动Procator的work线程
public void Start() { m_proactor.Start(); }
public void Start() { m_worker = new Thread(Loop) { IsBackground = true, Name = m_name }; m_worker.Start(); }
完整的Loop
方法以下
private void Loop() { var completionStatuses = new CompletionStatus[CompletionStatusArraySize]; while (!m_stopping) { // Execute any due timers. int timeout = ExecuteTimers(); int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed)) continue; for (int i = 0; i < removed; i++) { try { if (completionStatuses[i].OperationType == OperationType.Signal) { var mailbox = (IOThreadMailbox)completionStatuses[i].State; mailbox.RaiseEvent(); } // if the state is null we just ignore the completion status else if (completionStatuses[i].State != null) { var item = (Item)completionStatuses[i].State; if (!item.Cancelled) { switch (completionStatuses[i].OperationType) { case OperationType.Accept: case OperationType.Receive: item.ProactorEvents.InCompleted( completionStatuses[i].SocketError, completionStatuses[i].BytesTransferred); break; case OperationType.Connect: case OperationType.Disconnect: case OperationType.Send: item.ProactorEvents.OutCompleted( completionStatuses[i].SocketError, completionStatuses[i].BytesTransferred); break; default: throw new ArgumentOutOfRangeException(); } } } } catch (TerminatingException) { } } } }
var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
第一行初始化了CompletionStatus
数组,CompletionStatusArraySize
值为100。
CompletionStatus
做用是用来保存socket的信息或状态。
int timeout = ExecuteTimers();
protected int ExecuteTimers() { if (m_timers.Count == 0) return 0; long current = Clock.NowMs(); var keys = m_timers.Keys; for (int i = 0; i < keys.Count; i++) { var key = keys[i]; if (key > current) { return (int)(key - current); } var timers = m_timers[key]; foreach (var timer in timers) { timer.Sink.TimerEvent(timer.Id); } timers.Clear(); m_timers.Remove(key); i--; } return 0; }
ExecuteTimers
会计算以前加入到m_timers
须要等待的超时时间,若没有对象则直接返回0,不然获取若获取到key时间在当前时间以前,则须要调用TimerEvent
方法,调用完成后移除。
若获取到的key时间比当前时间大,则返回他们的差即为须要等待的超时时间。
int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed)) continue;
GetMultipleQueuedCompletionStatus
方法传入一个超时时间,若前面获取的超时时间为0,则这边会设置为-1,表示阻断直到有要处理的才返回。
CompletionPort
内部维护了一个状态队列,removed
即为处理完成返回的状态个数。
若获取成功则会返回true
,后面就开始遍历completionStatuses
数组处理完成Socket
。
public struct CompletionStatus { internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this() { AsyncSocket = asyncSocket; State = state; OperationType = operationType; SocketError = socketError; BytesTransferred = bytesTransferred; } public AsyncSocket AsyncSocket { get; private set; } public object State { get; internal set; } public OperationType OperationType { get; internal set; } public SocketError SocketError { get; internal set; } public int BytesTransferred { get; internal set; } }
CompletionStatus
是个结构体,它包含的信息如上。其中OperationType
是当前Socket
的处理方式。
public enum OperationType { Send, Receive, Accept, Connect, Disconnect, Signal }
在for
循环的一开始先会判断当前状态的OperationType
,如果Signal,则说明当前是个信号状态,说明有命令须要处理,则会调用IO信箱的RaiseEvent
方法,实际为IO线程的Ready
方法。
public void Ready() { Command command; while (m_mailbox.TryRecv(out command)) command.Destination.ProcessCommand(command); }
IOThread
会将当前信箱的全部命令进行处理。
若不是Signal
则会将CompletionStatus
保存的状态信息转换为Item
对象,并判断当前Socket
是否移除(取消)。若没有则对其进行处理。判断OperationType
,若为Accept
或Receive
则表示须要接收,则调用InCompleted
方法。若为Connect
,Disconnect
或Send
则表示有消息向外发送,则调用OutCompleted
方法。
至此IOThread
代码分析完毕。
internal class IOObject : IProactorEvents { public IOObject([CanBeNull] IOThread ioThread) { if (ioThread != null) Plug(ioThread); } public void Plug([NotNull] IOThread ioThread) { Debug.Assert(ioThread != null); m_ioThread = ioThread; } }
IOObject
实际就是保存了IOThread
的信息和Socket
处理完成时如何执行,以及向外暴露了一些接口。
再次说明,若是向简单了解完成端口如何使用,则看《完成端口使用》,若是想详细了解完成端口则看下《完成端口详细介绍》,若是想直到NetMQ的AsyncIO和完成端口的源码请看AsyncIO。
该篇介绍了IO线程和完成端口的处理方式,若哪里分析的不到位或有误但愿支出。
本文地址:http://www.javashuo.com/article/p-ghjehjvc-z.html 做者博客:杰哥很忙 欢迎转载,请在明显位置给出出处及连接)