ActiveMQ支持多种通信协议TCP/UDP等,咱们选取最经常使用的TCP来分析ActiveMQ的通信机制。首先咱们来明确一个概念:
客户(Client):消息的生产者、消费者对ActiveMQ来讲都叫做客户。
消息中介(Message broker):接收消息并进行相关处理后分发给消息的消费者.网络
为了能清楚的描述出ActiveMQ的核心通信机制,咱们选择3个部分来进行说明,它们分别是创建连接、关闭连接、心跳。 并发
1、Client跟activeMQ的TCP通信的初始化过程分析以下:
(1) ActiveMQ初始化时,经过TcpTransportServer类根据配置打开TCP侦听端口,客户端经过该端口发起创建连接的动做。
(2) 把接收到的socket放入阻塞队列。
(3) 另一个线程Socket handler阻塞着,监听是否有新的socket,若是有则取出来。
(4) 生成一个TransportConnection的实例。TransportConnection类的主要做用是处理链路的状态信息,并实现CommandVisitor接口来完成各种消息的处理。
(5) TransportConnection会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各种消息进行处理并发送相应的应答。这个链条的典型组成顺序: socket
MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环是TcpTransport类,它是实际和Client获取和发送数据的地方,该类的重要方法有run()和oneway(),一个负责读取,一个负责发送。
(6) 建链完成,能够进行通信操做。 spa
2、关闭连接
ActiveMQ发现TCP连接的关闭,最关键的代码在TcpBufferedInputStream类中的 线程
int n = in.read(buffer, position, buffer.length - position); 3d
3、心跳
为了更好的维护TCP链路的使用,ActiveMQ采用了心跳机制做为判断双方链路的健康状况。ActiveMQ使用的是双向心跳,也就是ActiveMQ的Broker和Client双方都进行相互心跳,但无论是Broker或Client心跳的具体处理状况是彻底同样的,都在InactivityMonitor类中实现,下面具体介绍。
心跳会产生两个线程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回true。WriteCheck线程主要调用的方法是writeCheck()。orm
这有个小技巧,你们能够参考一下,那就是当WriteCheck线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用经过TCP向对方真的发送心跳消息,这样能够从必定程度上减小网络传输的数据量。blog