客户端跟服务端链接创建,分两步:服务器
实例化ChannelManager的时候,根据配置的第一个server,从远程服务器读取服务器列表,若是能读取到,则顺序创建链接,直到创建成功为止;若是不能 读到,则根据本地配置的列表,逐个创建链接,直到成功为止。负载均衡
每间隔10s,检查当前channelFuture是否活跃,活跃,则300s检查一次,不活跃,则执行检查。检查的逻辑是:比较本地server列表跟远程服务提供的列表是否相等,不相等则根据远程服务提供的server列表顺序的从新创建第一个能用的ChannelFuture异步
若是有积压,或者关闭 掉了,则关闭当前链接,将activeIndex=-1,表示当前链接不可用。tcp
从0到activeIndex中找一个能链接的server,中心创建一个链接。若是activeIndex为-1,则从整个的server列表中顺序的找一个可用的链接创建链接。ide
客户端实例化DefaultTransportManager对象时,回按照以下流程先实例化m_tcpSocketSender,接着实例化ChannelManager。ChannelManager管理对服务端的netty链接。 实例化流程以下: 线程
ChannelManager经过ChannelHolder把netty的ChannnelFuture封装起来。ChannnelFuture结构以下:netty
public static class ChannelHolder { /** * 当前活跃的channelFuture */ private ChannelFuture m_activeFuture; /** * 当前server在m_serverAddresses中的第几个 */ private int m_activeIndex = -1; /** * 当前活跃的ChannelFuture对应的配置 */ private String m_activeServerConfig; /** * 从配置文件中读取的服务端列表 */ private List<InetSocketAddress> m_serverAddresses; /** * 当前活跃的ChannelFutre对应的ip */ private String m_ip; /** * 链接从第一次初始化开始,是否发生过变动 */ private boolean m_connectChanged; //省略其它的代码 }
ChannelManager内部每隔10秒钟,检查netty链接。这部分代码以下:code
@Override public void run() { while (m_active) { /* * make save message id index asyc * 本地存储index,和 时间戳,防止重启,致使本地的消息id重了 */ m_idfactory.saveMark(); /** * 检查本地初始化的服务列表跟远程的服务列表是否有差别,若是有差别,则取远程第一个能创建链接的server,创建一个新的链接, * 关闭旧的链接 */ checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 检查当前channelFuture是否有消息积压(本地队列长度超过4990),或者 channelFuture不是开的 * @param activeFuture */ doubleCheckActiveServer(activeFuture); /** * 从serverAddresses列表里面,重新顺序选一个,从新链接 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }
总结:服务端没有作到负载均衡,链接会慢慢链接到server列表里面第一个可用的server上。server