偶然下和朋友聊到了mysql多节点集群架场景,对应咱们各系代码如何去使用它,牵扯到mysql的驱动包中已经支持的策略。发现这块的概念有些模糊,索性就梳理了一番留着后用。重点是:replication\loadbalance,以mysql-connector-java:5.1.38为例展开。html
官方java驱动文档:Multi-Host Connectionsjava
Mysql的multi-connection策略,官方叫server failover。replication和loadbalance协议都是创建在failover基础上演变而来对应mysql不一样的部署架构:Cluster架构和Replication架构,这两个再也不展开。mysql
jdbc:mysql://[primary host][:port],[secondary host 1][:port][,[secondary host 2][:port]]...[/[database]]» [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
官方URL格式:primary host表明主库首选链接,secondary host表明从库,依次排列尝试链接同时支持不少配置属性(详细去看文档):linux
在驱动包中类NonRegisteringDriver.java定义了mysql支持的多种链接协议,能够看出有四种方式。mxj是指mysql会根据不一样的系统平台切换不一样的mysqld binary执行,支持macos、linux、window、Solaris等等,replicaiton和loadbalance后面会介绍。sql
private static final String REPLICATION_URL_PREFIX = "jdbc:mysql:replication://"; private static final String URL_PREFIX = "jdbc:mysql://"; private static final String MXJ_URL_PREFIX = "jdbc:mysql:mxj://"; public static final String LOADBALANCE_URL_PREFIX = "jdbc:mysql:loadbalance://";
咱们根据Mysql支持的驱动类型也能够看出一共三个实现类,常规使用的com.mysql.jdbc.Driver,主从库模式下采用com.mysql.jdbc.ReplicationDriver,还有种为兼容 MM.MySQL实现的org.djt.mm.mysql.Driver。数据库
当如下三个场景中会出发loadbalance策略执行:macos
jdbc.diver= com.mysql.jdbc.ReplicationDriver
jdbc.url= jdbc:mysql:replication://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false服务器
具体格式相似failover,比较大的变化是第一个host为master库是write/read模式,后面都是slave库是read模式,也是支持参数进行配置官网有详细介绍。架构
jdbc\:mysql\:replication\://\[master host]\[:port],\[slave host 1]\[:port]\[,\[slave host 2][:port]]...[/[database]] » [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
replication协议是创建在failover和loadbalance基础上,适应Replication架构须要为解决读写分离、负载均衡场景的。在事务read only模式下请求会被转向到slave host,若果多个slave状况下采用round-robin(轮询)策略。对于非read only请求(write/read)都将转向到master host。5.1.27后版本支持多个master,多个master下采用load balance策略,具体参考loadbalance协议介绍。5.1.28版本后又支持动态添加节点,也就是程序运行是动态添加新的host到URL中而不须要重启服务器,咱们常常会聊的动态数据源场景。负载均衡
Read only这里是与数据库创建的connection属性,如Connection.setReadOnly(false)、Connection.setReadOnly(true)。
jdbc.dirver= com.mysql.jdbc.Driver
jdbc.url= jdbc:mysql:loadbalance://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false&loadBalanceStrategy=bestResponseTime
jdbc:mysql:loadbalance://[host1][:port],[host2][:port][,[host3][:port]]...[/[database]] » [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
格式同failover\replication相似,全部host没有主次之分都是平级,也支持参数控制:
BalanceStrategy.java
public interface BalanceStrategy extends Extension { public abstract ConnectionImpl pickConnection(LoadBalancedConnectionProxy proxy, List<String> configuredHosts, Map<String, ConnectionImpl> liveConnections,long[] responseTimes, int numRetries) throws SQLException; }
loadbalance负载均衡策略定义了BalanceStrategy接口,mysql支持已经实现该接口的策略有:BestResponseTimeBalanceStrategy、RandomBalanceStrategy(默认)、SequentialBalanceStrategy。
经过源码 Driver -> NonRegisteringDriver.connect(xx) -> NonRegisteringDriver.connectLoadBalanced() -> LoadBalancedConnectionProxy.createProxyInstance() 能够看到默认采用策略是 RandomBalanceStrategy。
RandomBalanceStrategy.java
int random = (int) Math.floor((Math.random() * whiteList.size())); String hostPortSpec = whiteList.get(random); ConnectionImpl conn = liveConnections.get(hostPortSpec);
BestResponseTimeBalanceStrategy.pickConnection(..)
for (int i = 0; i < responseTimes.length; i++) { long candidateResponseTime = responseTimes[i]; if (candidateResponseTime < minResponseTime && !blackList.containsKey(configuredHosts.get(i))) { if (candidateResponseTime == 0) { bestHostIndex = i; break; } bestHostIndex = i; minResponseTime = candidateResponseTime; } }
SequentialBalanceStrategy.pickConnection(..)
if (numHosts == 1) { this.currentHostIndex = 0; // pathological case } else if (this.currentHostIndex == -1) { int random = (int) Math.floor((Math.random() * numHosts)); for (int i = random; i < numHosts; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; break; } } if (this.currentHostIndex == -1) { for (int i = 0; i < random; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; break; } } } //... } else { int i = this.currentHostIndex + 1; boolean foundGoodHost = false; for (; i < numHosts; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; foundGoodHost = true; break; } } if (!foundGoodHost) { for (i = 0; i < this.currentHostIndex; i++) { if (!blackList.containsKey(configuredHosts.get(i))) { this.currentHostIndex = i; foundGoodHost = true; break; } } } //... }
NonRegisteringDriver.java
public class NonRegisteringDriver implements java.sql.Driver { if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) { return connectLoadBalanced(url, info); } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) { return connectReplicationConnection(url, info); } }
LoadBalancedConnectionProxy.java
public class LoadBalancedConnectionProxy extends MultiHostConnectionProxy implements PingTarget { /** * Creates a proxy for java.sql.Connection that routes requests between the given list of host:port and uses the given properties when creating connections. * * @param hosts * The list of the hosts to load balance. * @param props * Connection properties from where to get initial settings and to be used in new connections. * @throws SQLException */ private LoadBalancedConnectionProxy(List<String> hosts, Properties props) throws SQLException { super(); String group = props.getProperty("loadBalanceConnectionGroup", null); boolean enableJMX = false; String enableJMXAsString = props.getProperty("loadBalanceEnableJMX", "false"); try { enableJMX = Boolean.parseBoolean(enableJMXAsString); } catch (Exception e) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceEnableJMX", new Object[] { enableJMXAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } if (group != null) { this.connectionGroup = ConnectionGroupManager.getConnectionGroupInstance(group); if (enableJMX) { ConnectionGroupManager.registerJmx(); } this.connectionGroupProxyID = this.connectionGroup.registerConnectionProxy(this, hosts); hosts = new ArrayList<String>(this.connectionGroup.getInitialHosts()); } // hosts specifications may have been reset with settings from a previous connection group int numHosts = initializeHostsSpecs(hosts, props); this.liveConnections = new HashMap<String, ConnectionImpl>(numHosts); this.hostsToListIndexMap = new HashMap<String, Integer>(numHosts); for (int i = 0; i < numHosts; i++) { this.hostsToListIndexMap.put(this.hostList.get(i), i); } this.connectionsToHostsMap = new HashMap<ConnectionImpl, String>(numHosts); this.responseTimes = new long[numHosts]; String retriesAllDownAsString = this.localProps.getProperty("retriesAllDown", "120"); try { this.retriesAllDown = Integer.parseInt(retriesAllDownAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String blacklistTimeoutAsString = this.localProps.getProperty(BLACKLIST_TIMEOUT_PROPERTY_KEY, "0"); try { this.globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String strategy = this.localProps.getProperty("loadBalanceStrategy", "random"); if ("random".equals(strategy)) { this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null) .get(0); } else if ("bestResponseTime".equals(strategy)) { this.balancer = (BalanceStrategy) Util .loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); } else { this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0); } String autoCommitSwapThresholdAsString = props.getProperty("loadBalanceAutoCommitStatementThreshold", "0"); try { this.autoCommitSwapThreshold = Integer.parseInt(autoCommitSwapThresholdAsString); } catch (NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementThreshold", new Object[] { autoCommitSwapThresholdAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } String autoCommitSwapRegex = props.getProperty("loadBalanceAutoCommitStatementRegex", ""); if (!("".equals(autoCommitSwapRegex))) { try { "".matches(autoCommitSwapRegex); } catch (Exception e) { throw SQLError.createSQLException( Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementRegex", new Object[] { autoCommitSwapRegex }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null); } } if (this.autoCommitSwapThreshold > 0) { String statementInterceptors = this.localProps.getProperty("statementInterceptors"); if (statementInterceptors == null) { this.localProps.setProperty("statementInterceptors", "com.mysql.jdbc.LoadBalancedAutoCommitInterceptor"); } else if (statementInterceptors.length() > 0) { this.localProps.setProperty("statementInterceptors", statementInterceptors + ",com.mysql.jdbc.LoadBalancedAutoCommitInterceptor"); } props.setProperty("statementInterceptors", this.localProps.getProperty("statementInterceptors")); } this.balancer.init(null, props); String lbExceptionChecker = this.localProps.getProperty("loadBalanceExceptionChecker", "com.mysql.jdbc.StandardLoadBalanceExceptionChecker"); this.exceptionChecker = (LoadBalanceExceptionChecker) Util.loadExtensions(null, props, lbExceptionChecker, "InvalidLoadBalanceExceptionChecker", null) .get(0); pickNewConnection(); } }
做者:Owen Jia,推荐关注他的博客:Owen Blog。