本文基于Hive2.1.0的Apache社区版,目的是为了探究Metastore和底层RDBMS和底层服务变动(例如版本升级、服务迁移等运维操做)对客户端和用户的影响。Hive提供了在客户端对Metastore链接超时自动重连的容错机制,容许咱们经过调整参数配置调整停服时间限制,在规定时间内重启服务对用户无显著影响。因为Metastore底层RDBMS咱们采用的是业内通用的Mysql,所以后面以Mysql来替代RDBMS进行描述和验证java
参数 | 默认值 | 说明 | 配置范围 |
---|---|---|---|
hive.metastore.connect.retries | 3 | 客户端创建与metastore链接时的重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.failure.retries | 1 | 客户端访问metastore的失败重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.connect.retry.delay | 1s | Metastore客户端重连/重试等待的时间 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.timeout | 600s | Metastore客户端socket超时时间,传递给底层Socket,超时以后底层Socket会自动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.lifetime | 0 | socket存活时间,超时以后客户端在下一次访问Metastore时会主动断开现有链接并从新创建链接,0表示不主动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.hmshandler.retry.attempts | 10 | 在JDO数据存储出现错误后尝试链接的次数 | Metastore |
hive.hmshandler.retry.interval | 2000ms | JDO链接尝试间隔,单位:ms | Metastore |
hive.server2.thrift.client.connect.retry.limit | 1 | 客户端创建与Hiveserver2链接的重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.limit | 1 | 客户端访问Hiveserver2的失败重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客户端重连/重试等待的时间 | Hiveserver2的客户端,如Beeline等 |
为了弄清这两个参数的区别,让咱们经过源码来确认一下,ps:为了方便阅读后面会用......省略掉无关的代码逻辑sql
CLI和Hiveserver2都是经过org.apache.hadoop.hive.ql.metadata.Hive类与Metastore的交互的。首先让咱们以createDatabase(Database, boolean)方法为例来看看具体的交互过程apache
/** * Create a database * @param db * @param ifNotExist if true, will ignore AlreadyExistsException exception * @throws AlreadyExistsException * @throws HiveException */ public void createDatabase(Database db, boolean ifNotExist) throws AlreadyExistsException, HiveException { try { getMSC().createDatabase(db); } catch (AlreadyExistsException e) { if (!ifNotExist) { throw e; } } catch (Exception e) { throw new HiveException(e); } } /** * @return the metastore client for the current thread * @throws MetaException */ @LimitedPrivate(value = {"Hive"}) @Unstable public synchronized IMetaStoreClient getMSC( boolean allowEmbedded, boolean forceCreate) throws MetaException { if (metaStoreClient == null || forceCreate) { ...... try { metaStoreClient = createMetaStoreClient(allowEmbedded); } catch (RuntimeException ex) { ...... } ...... } return metaStoreClient; }
Hive类维护了一个IMetaStoreClient对象,经过getMSC()方法获取,getMSC()方法在这里采用了懒汉模式去建立,接下来看下Hive是如何建立一个IMetaStoreClient对象的网络
// org.apache.hadoop.hive.ql.metadata.Hive.java private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { ...... if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); } else { return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap, SessionHiveMetaStoreClient.class.getName(), allowEmbedded); } }
if后面的分支用于建立客户端内置的本地Metastore,这主要用于开发调试阶段,所以咱们只关注else后面的逻辑,即经过RetryingMetaStoreClient.getProxy方法建立一个IMetaStoreClient对象。RetryingMetaStoreClient.getProxy方法经过几回简单地调用重载函数,最终来到下面的方法运维
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName) throws MetaException { @SuppressWarnings("unchecked") Class<? extends IMetaStoreClient> baseClass = (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName); RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, metaCallTimeMap, baseClass); return (IMetaStoreClient) Proxy.newProxyInstance( RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); }
能够看到,这里利用Java代理机制建立并返回了一个IMetaStoreClient的代理——RetryingMetaStoreClient,此后对IMetaStoreClient对象的调用都委托给RetryingMetaStoreClient.invoke 处理,接下来让咱们看下RetryingMetaStoreClient.invoke方法是如何处理用户对IMetastoreClient对象的操做的dom
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; int retriesMade = 0; TException caughtException = null; while (true) { try { reloginExpiringKeytabUser(); // 1. 检查是否重连,重连的场景包括: // a) 上一次循环访问Metastore异常,且异常类型支持自动重试访问 // b) 底层socket超时,超时参数:hive.metastore.client.socket.lifetime if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { base.reconnect(); lastConnectionTime = System.currentTimeMillis(); } if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { // need to capture the timing long startTime = System.currentTimeMillis(); ret = method.invoke(base, args); long timeTaken = System.currentTimeMillis() - startTime; addMethodTime(method, timeTaken); } // 2. 访问Metastore正常,返回结果给上层调用并结束循环,用户不主动结束的状况下底层与Metastore的链接持续保持着 break; // 3. 处理访问Metastore过程当中出现的异常,异常主要分三类: // a) 用户操做异常或元数据异常,将异常抛给用户处理并结束循环 // b) 底层链接异常,例如网络问题、Metastore服务异常(停服、链接超限等)等支持自动重连,进入步骤4 // c) 其余未知异常,抛给用户处理并结束循环 } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof TApplicationException) { TApplicationException tae = (TApplicationException)t; switch (tae.getType()) { case TApplicationException.UNSUPPORTED_CLIENT_TYPE: case TApplicationException.UNKNOWN_METHOD: case TApplicationException.WRONG_METHOD_NAME: case TApplicationException.INVALID_PROTOCOL: throw t; default: caughtException = tae; } } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { caughtException = (TException)t; } else if ((t instanceof MetaException) && t.getMessage().matches( "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = (MetaException)t; } else { throw t; } } catch (MetaException e) { if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = e; } else { throw e; } } // 4. 对于支持自动重试的异常,会记录重试次数并验证次数是否超限,是则返回异常并结束循环,不然将以warn形式输出异常日志提醒并等等一段时间后开始下一次循环自动重试访问Metastore。这里用到的重试次数参数和等待时间参数分别是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay if (retriesMade >= retryLimit) { throw caughtException; } retriesMade++; Thread.sleep(retryDelaySeconds * 1000); } return ret; } protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, Class<? extends IMetaStoreClient> msClientClass) throws MetaException { this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); this.retryDelaySeconds = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); this.metaCallTimeMap = metaCallTimeMap; this.connectionLifeTimeInMillis = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); ...... this.base = (IMetaStoreClient) MetaStoreUtils.newInstance( msClientClass, constructorArgTypes, constructorArgs); }
从 RetryingMetaStoreClient 的构造函数中能够发现,RetryingMetaStoreClient 维护了一个 HiveMetaStoreClient 对象,用户在上层调用一次 RetryingMetaStoreClient 对象操做,例如第一步的 createDatabase 方法,会通过 RetryingMetaStoreClient.invoke 的封装最终调用HiveMetaStoreClient类中的同名方法进行操做。在 RetryingMetaStoreClient.invoke 中封装了自动重试的逻辑,在底层与Metastore的链接过程当中出现异常的状况下会自动重试而不影响上层用户的操做。socket
这里咱们在注释中标注了 invoke 方法中主要的操做步骤,能够看到,重试次数由参数hive.metastore.failure.retries控制,两次重试之间的等待时间由hive.metastore.client.connect.retry.delay控制。ide
注意,这里咱们说的是“重试”,而不是“重连”,一次重试中与Metastore的交互有两步:1. 创建与Metastore的会话 2. 执行用户请求。咱们继续看下客户端是怎么创建与Metastore的会话的函数
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java @Override public void reconnect() throws MetaException { ...... close(); // 当配置了多个Metastore时,会随机调整Metastore顺序 promoteRandomMetaStoreURI(); open(); } private void open() throws MetaException { isConnected = false; ...... // hive.metastore.client.socket.timeout int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); ...... try { transport.open(); isConnected = true; } catch (TTransportException e) { ...... } ...... } catch (MetaException e) { ...... } if (isConnected) { break; } } // Wait before launching the next round of connection retries. if (!isConnected && retryDelaySeconds > 0) { try { Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } } if (!isConnected) { throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + StringUtils.stringifyException(tte)); } ...... } public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException { ...... // hive.metastore.connect.retries retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); // hive.metastore.client.connect.retry.delay retryDelaySeconds = conf.getTimeVar( ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); ...... // 初始化一个HiveMetaStoreClient对象时会尝试创建与Metastore的长会话 open(); }
同上一步的重试逻辑相似,与Metastore的链接支持自动重连,由 hive.metastore.connect.retries 控制重连次数,hive.metastore.client.connect.retry.delay 控制重连等待时间,底层利用Thrift提供的RPC通讯服务。oop
若是配置了多个Metastore地址,每一次重连的时候会按顺序遍历全部的Metastore并尝试与之创建会话,直到有一个会话创建成功为止。
此外,初始化一个HiveMetaStoreClient对象时会调用open()方法尝试创建一个与Metastore的长会话,供后面的用户请求使用