hive 2.1html
最近有一个场景,要向一个表的多个分区写数据,为了缩短执行时间,采用并发的方式,多个sql同时执行,分别写不一样的分区,同时开启动态分区:sql
set hive.exec.dynamic.partition=trueapache
insert overwrite table test_table partition(dt) select * from test_table_another where dt = 1;并发
结果发现只有1个sql运行,其余sql都会卡住;
查看hive thrift server线程堆栈发现请求都卡在DbTxnManager上,hive关键配置以下:app
hive.support.concurrency=true
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManageride
配置对应的默认值及注释:oop
org.apache.hadoop.hive.conf.HiveConfui
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "), HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing\n" + "(true), and hive.exec.dynamic.partition.mode (nonstrict).\n" + "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides\n" + "no transactions."),
hive执行sql的详细过程详见:http://www.javashuo.com/article/p-opeaomay-eo.htmlspa
hive中执行sql最终都会调用到Driver.run,run会调用runInternal,下面直接看runInternal代码:线程
org.apache.hadoop.hive.ql.Driver
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { ... if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { ret = handleInterruption("at acquiring the lock."); } else { ret = acquireLocksAndOpenTxn(startTxnImplicitly); } ... private boolean requiresLock() { if (!checkConcurrency()) { return false; } // Lock operations themselves don't require the lock. if (isExplicitLockOperation()){ return false; } if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) { return true; } Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>(); taskQueue.addAll(plan.getRootTasks()); while (taskQueue.peek() != null) { Task<? extends Serializable> tsk = taskQueue.remove(); if (tsk.requireLock()) { return true; } ... private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { LOG.info("Concurrency mode is disabled, not creating a lock manager"); return false; } return true; } private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { ... txnMgr.acquireLocks(plan, ctx, userFromUGI); ...
runInternal会调用requiresLock判断是否须要lock,requiresLock有两个判断:
若是判断须要lock,会调用acquireLocksAndOpenTxn,acquireLocksAndOpenTxn会调用HiveTxnManager.acquireLocks来获取lock;
1)先看那些task须要lock:
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer
private void analyzeAlterTablePartMergeFiles(ASTNode ast, String tableName, HashMap<String, String> partSpec) throws SemanticException { ... DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc); ddlWork.setNeedLock(true); ...
可见DDL操做须要;
2)再看怎样获取lock:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); ... void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { LockState ls = acquireLocks(plan, ctx, username, true); ... LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { ... switch (output.getType()) { case DATABASE: compBuilder.setDbName(output.getDatabase().getName()); break; case TABLE: case DUMMYPARTITION: // in case of dynamic partitioning lock the table t = output.getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; case PARTITION: compBuilder.setPartitionName(output.getPartition().getName()); t = output.getPartition().getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; default: // This is a file or something we don't hold locks for. continue; } ... LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); return lockState; }
可见当开启动态分区时,锁的粒度是DbName+TableName,这样就会致使多个sql只有1个sql能够拿到lock,其余sql只能等待;
解决问题的方式有几种:
三者任选其一,推荐第1种,由于在刚才的场景下,不须要动态分区;