JedisClusterConnectionHandler提供了JedisCluster接口获取资源池中Jedis链接对象的一个门面类,JedisClusterConnectionHandler提供了初始化集群,获取资源池中链接对象,刷新资源池等各类方法。JedisClusterConnectionHandler必须依赖于JedisClusterInfoCache类,由于 JedisClusterConnectionHandler自己的方法都是基于JedisClusterInfoCache类实现的,下列所讲解的方法的底层实现会在JedisClusterInfoCache进行讲述。node
JedisClusterConnectionHandler实例化时必须提供如下的参数(见代码1-1),这些参数自己对于JedisClusterConnectionHandler没什么用处,彻底就是在该类的构造器中走了一遭,由于JedisClusterConnectionHandler内部维护着JedisClusterInfoCache实例。JedisClusterConnectionHandler将这些参数传递给cache实例后,经过initializeSlotsCache()方法获取集群信息,并调用cache.discoverClusterNodesAndSlots()将获取到的集群信息分别放入nodes缓存和slots缓存中。这两个缓存的存储形式为 Map<String, JedisPool>,Map<Integer, JedisPool> ,虽然都是Map,可是存储的数据内容彻底不一样,nodes存储着集群全部节点信息,存储形式为<"host:port",JedisPool>,而slots存储的信息为<slot,JedisPool>,而且slots只存储主节点信息。redis
从代码1-2中看出,经过遍历节点集合,实例化Jedis对象,再调用cache.discoverClusterNodesAndSlots(jedis)方法将节点放入缓存Map中,获取集群信息的方法也很简单,经过发送'cluster slots'便知道槽和节点的对应状况,再将Redis服务端返回来的输入流解析成Java对象便可。这里注明下,发现集群情况只须要一个可用的Jedis实例便可,可是集群节点总会发生"挂掉"的状况,建议在使用JedisCluster时配上全部的节点信息。后端
代码1-1缓存
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) { //实例化集群信息缓存
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap); initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier); }
代码1-2 集群发现及初始化缓存池dom
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { //遍历节点集合,拿出一个可用的节点获取集群信息
for (HostAndPort hostAndPort : startNodes) { Jedis jedis = null; try { jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); //添加密码认证
if (password != null) { jedis.auth(password); } //设置客户端名称
if (clientName != null) { jedis.clientSetname(clientName); } //TODO 根据节点发现集群信息
cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes
} finally { if (jedis != null) { jedis.close(); } } } }
JedisClusterConnectionHandler提供了四种获取可用节点的方法,其中 getConnection()和 getConnectionFromSlot(int slot);被定义为抽象方法,交由JedisSlotBasedConnectionHandler子类进行实现。值得注意的是,在获取Jedis链接时,有两个比较重要的点须要说明一下。前面说过JedisClusterInfoCache存放着nodes和slots两个map对象,一般集群模式都会作主从分离,即master节点负责读写操做,slave节点负责读和备份.若是尝试对slave节点进行写操做,那么会发生写入错误。因此提供了getConnectionFromSlot(int slot) 方法来根据槽点值获取对应的主节点信息,若是此时后端集群发生主从切换,该方法会尝试刷新集群状态来保证获取可用的Jedis对象。若是集群频繁的出现切换,那么即便刷新了集群信息也不能保证获取到的Jedis对象必定是有效的,代码中也作了说明"It can't guaranteed to get valid connection because of node assignment(由于节点的分配问题并不能获取到有效链接)",因此代码中采用了重试机制和随机数来尽可能保证获取到的是有效的Jedis链接。第二个点是Jedis包依赖于Apache-CommonsPool2来作Jedis的二级缓存,为了保证Jedis是有效的,Jedis包下的JedisFactory实现了PooledObjectFactory相关方法,经过配置DEFAULT_TEST_ON_CREATE,DEFAULT_TEST_ON_BORROW,DEFAULT_TEST_ON_RETURN,DEFAULT_NUM_TESTS_PER_EVICTION_RUN 参数保证在建立阶段,借用阶段,归还阶段时Jedis对象都是可用的。源码分析
代码2-1:从缓存池中获取Jedis对象ui
public Jedis getConnectionFromNode(HostAndPort node) { //使用二级缓存,先从JedisClusterInfoCache中获取缓存对象,再去commons-pool包下获取Jedis对象
return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { //直接从JedisClusterInfoCache 的nodes缓存中获取全部JedisPool对象
return cache.getNodes(); } public Jedis getConnection() { // In antirez's redis-rb-cluster implementation, // getRandomConnection always return valid connection (able to // ping-pong) // or exception if all connections are invalid //随机获取
List<JedisPool> pools = cache.getShuffledNodesPool(); /** * 遍历缓存池中的每一个池对象 */
for (JedisPool pool : pools) { Jedis jedis = null; try { jedis = pool.getResource(); if (jedis == null) { continue; } String result = jedis.ping();//尝试进行ping
if (result.equalsIgnoreCase("pong")) return jedis;//返回可用链接
jedis.close(); } catch (JedisException ex) { if (jedis != null) { jedis.close(); } } } //遍历完成后仍然没法获取有效实例
throw new JedisNoReachableClusterNodeException("No reachable node in cluster"); } public Jedis getConnectionFromSlot(int slot) { //从slots中获取
JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment
return connectionPool.getResource(); } else { //刷新集群信息
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { //TODO
return connectionPool.getResource(); } else { //no choice, fallback to new connection to random node //回到随机获取
return getConnection(); } } }
JedisClusterConnectionHandler提供了三个方法来进行集群信息的刷新和关闭操做,详细的说明见《Jedis源码分析:JedisClusterInfoCache》this