这篇文章的目的主要是讲解TC的在处理分支事务注册过程当中对全局锁的处理流程,理解了全局锁之后才能明白对DB同一个记录进行屡次变动是如何解决的。git
如上图所示,问最终全局事务A对资源R1应该回滚到哪一种状态?很明显,若是再根据UndoLog去作回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变动。
那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程当中全局事务B会由于获取不到全局锁而处于等待状态。github
public class ConnectionProxy extends AbstractConnectionProxy { public void commit() throws SQLException { if (context.inGlobalTransaction()) { try { // 一、向TC发起注册操做并检查是否可以获取全局锁 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e); } try { if (context.hasUndoLog()) { UndoLogManager.flushUndoLogs(this); } // 二、执行本地的事务的commit操做 targetConnection.commit(); } catch (Throwable ex) { report(false); if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } report(true); context.reset(); } else { targetConnection.commit(); } } private void register() throws TransactionException { Long branchId = DataSourceManager.get().branchRegister( BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.buildLockKeys()); context.setBranchId(branchId); } }
说明:session
public class DefaultCore implements Core { protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { response.setTransactionId(request.getTransactionId()); response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), XID.generateXID(request.getTransactionId()), request.getLockKey())); } public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin); BranchSession branchSession = new BranchSession(); branchSession.setTransactionId(XID.getTransactionId(xid)); branchSession.setBranchId(UUIDGenerator.generateUUID()); branchSession.setApplicationId(globalSession.getApplicationId()); branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup()); branchSession.setBranchType(branchType); branchSession.setResourceId(resourceId); branchSession.setLockKey(lockKeys); branchSession.setClientId(clientId); // 判断branchSession是否可以获取锁 if (!branchSession.lock()) { throw new TransactionException(LockKeyConflict); } try { globalSession.addBranch(branchSession); } catch (RuntimeException ex) { throw new TransactionException(FailedToAddBranch); } return branchSession.getBranchId(); } public boolean lock() throws TransactionException { return LockManagerFactory.get().acquireLock(this); } }
说明:数据结构
public class DefaultLockManagerImpl implements LockManager { public boolean acquireLock(BranchSession branchSession) throws TransactionException { String resourceId = branchSession.getResourceId(); long transactionId = branchSession.getTransactionId(); //一、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。 ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId); if (dbLockMap == null) { LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>()); dbLockMap = LOCK_MAP.get(resourceId); } ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder(); // 二、获取branchSession的全局锁的key对象 String lockKey = branchSession.getLockKey(); if(StringUtils.isEmpty(lockKey)) { return true; } // 三、按照分号“;”切割多个LockKey,每一个LockKey按照table:pk1;pk2;pk3格式组装。 String[] tableGroupedLockKeys = lockKey.split(";"); for (String tableGroupedLockKey : tableGroupedLockKeys) { int idx = tableGroupedLockKey.indexOf(":"); if (idx < 0) { branchSession.unlock(); throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey()); } // 四、分割获取branchRegister请求的表名和pks。 String tableName = tableGroupedLockKey.substring(0, idx); String mergedPKs = tableGroupedLockKey.substring(idx + 1); // 五、获取表下的已经加锁的记录tableLockMap ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName); if (tableLockMap == null) { dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>()); tableLockMap = dbLockMap.get(tableName); } // 六、遍历该表全部pks判断是否已加锁。 String[] pks = mergedPKs.split(","); for (String pk : pks) { // 七、同一个表的pk按照hash值进行hash分配到tableLockMap当中。 int bucketId = pk.hashCode() % BUCKET_PER_TABLE; Map<String, Long> bucketLockMap = tableLockMap.get(bucketId); if (bucketLockMap == null) { tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>()); bucketLockMap = tableLockMap.get(bucketId); } // 八、根据pk去获取bucketLockMap当中获取锁对象。 synchronized (bucketLockMap) { Long lockingTransactionId = bucketLockMap.get(pk); if (lockingTransactionId == null) { // No existing lock // 九、将锁添加到branchSession当中 bucketLockMap.put(pk, transactionId); Set<String> keysInHolder = bucketHolder.get(bucketLockMap); if (keysInHolder == null) { bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>()); keysInHolder = bucketHolder.get(bucketLockMap); } keysInHolder.add(pk); } else if (lockingTransactionId.longValue() == transactionId) { // Locked by me continue; } else { // 直接返回异常 LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId); branchSession.unlock(); // Release all acquired locks. return false; } } } } return true; } }
说明:app
private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>> LOCK_MAP = new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>>();
说明:ui
- Github:https://github.com/alibaba/fescarthis
- 官方中文介绍:https://github.com/alibaba/fescar/wikispa