大众点评Cat源码阅读(七)——客户端选择server的机制

1、概要思路

客户端跟服务端链接创建,分两步:服务器

  1. 初始ChannelMananger的时候 ;
  2. ChannelManager异步线程,每隔10秒作一次检查。

1.1 初始ChannelMananger的时候

实例化ChannelManager的时候,根据配置的第一个server,从远程服务器读取服务器列表,若是能读取到,则顺序创建链接,直到创建成功为止;若是不能 读到,则根据本地配置的列表,逐个创建链接,直到成功为止。负载均衡

1.2 ChannelManager异步线程,每隔10秒作一次检查

1.2.1 检查server列表是否变动

每间隔10s,检查当前channelFuture是否活跃,活跃,则300s检查一次,不活跃,则执行检查。检查的逻辑是:比较本地server列表跟远程服务提供的列表是否相等,不相等则根据远程服务提供的server列表顺序的从新创建第一个能用的ChannelFuture异步

1.2.2 查看当前客户端是否有积压,或者channelFuture是否被关闭

若是有积压,或者关闭 掉了,则关闭当前链接,将activeIndex=-1,表示当前链接不可用。tcp

1.2.3 重连默认server

从0到activeIndex中找一个能链接的server,中心创建一个链接。若是activeIndex为-1,则从整个的server列表中顺序的找一个可用的链接创建链接。ide

2、ChannelManager 实例化,创建netty链接逻辑

客户端实例化DefaultTransportManager对象时,回按照以下流程先实例化m_tcpSocketSender,接着实例化ChannelManager。ChannelManager管理对服务端的netty链接。 实例化流程以下: 输入图片说明线程

ChannelManager经过ChannelHolder把netty的ChannnelFuture封装起来。ChannnelFuture结构以下:netty

public static class ChannelHolder {
		/**
		 * 当前活跃的channelFuture
		 */
		private ChannelFuture m_activeFuture;

		/**
		 * 当前server在m_serverAddresses中的第几个
		 */
		private int m_activeIndex = -1;

		/**
		 * 当前活跃的ChannelFuture对应的配置
		 */
		private String m_activeServerConfig;

		/**
		 * 从配置文件中读取的服务端列表
		 */
		private List<InetSocketAddress> m_serverAddresses;

		/**
		 * 当前活跃的ChannelFutre对应的ip
		 */
		private String m_ip;

		/**
		 * 链接从第一次初始化开始,是否发生过变动
		 */
		private boolean m_connectChanged;
                
                //省略其它的代码
}

3、ChannelManager内部异步线程,动态切换netty链接逻辑

ChannelManager内部每隔10秒钟,检查netty链接。这部分代码以下:code

@Override
	public void run() {
		while (m_active) {
			/*
			 * make save message id index asyc
			 * 本地存储index,和 时间戳,防止重启,致使本地的消息id重了
			 */
			m_idfactory.saveMark();
			
			/**
			 * 检查本地初始化的服务列表跟远程的服务列表是否有差别,若是有差别,则取远程第一个能创建链接的server,创建一个新的链接,
			 * 关闭旧的链接
			 */
			checkServerChanged();

			ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
			List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

			/**
			 * 检查当前channelFuture是否有消息积压(本地队列长度超过4990),或者 channelFuture不是开的
			 * @param activeFuture
			 */
			doubleCheckActiveServer(activeFuture);
			/**
			 * 从serverAddresses列表里面,重新顺序选一个,从新链接
			 */
			reconnectDefaultServer(activeFuture, serverAddresses);

			try {
				Thread.sleep(10 * 1000L); // check every 10 seconds
			} catch (InterruptedException e) {
				// ignore
			}
		}
	}

总结:服务端没有作到负载均衡,链接会慢慢链接到server列表里面第一个可用的server上。server

相关文章
相关标签/搜索