在NC系列中,咱们发布了一个 EJB接口后,对接口的调用 事务的commit 和 rollback是全自动的 通常非特殊需求下无需手工处理,那么NC如何实现的事务无感知 处理呢? java
上文说道 事务的代理是 nc.bs.framework.ejb.CMTEJBServiceHandler 类 转发给 delegate,咱们去调试下 看看 他如何实现事务开启结束子事务等:sql
nc.itf.framework.ejb.CMTProxy_Local 咱们进入看看:数据库
第一个是 开一个全新子事务,第二个是继承已有事务 没有就新开。app
protected void beforeCallMethod(int methodId) { Logger.info("Begin Transaction(" + methodId + ")"); MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod"); this.setLastCallTime(System.currentTimeMillis()); boolean isCmt = ((HomeBase)this.getEJBLocalHome()).getEJBBeanDescriptor().isCmt(); if (isCmt) { try { this.currentMethodTransectionType = this.getMethodTransectionType(methodId); int isolateLevel = this.getMethodIsolateLevelType(methodId); this.setIerpTransactionManagerProxy(TransactionFactory.getTMProxy()); this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel); } catch (Exception var4) { Logger.error("BeforeCallMethod", var4); } } else { if (this.getIerpUserTransaction() == null) { this.setIerpTransactionManagerProxy((IContainerTransProxy)null); this.setIerpUserTransaction(TransactionFactory.getUTransaction()); } this.getIerpUserTransaction().bindToCurrentThread(); } MwTookit.setThreadState("nc.bs.mw.naming.BeanBase.beforeCallMethod over"); }
2个都开头第一行调用了 这个方法, 新开子事务 200, 继承事务穿的 201. 咱们看看这个方法的核心 this.getIerpTransactionManagerProxy().begin(this.currentMethodTransectionType, isolateLevel); 干了什么:dom
public void begin(int transType, int isolateLevel) throws NotSupportedException, SystemException, NamingException, TransactionRequiredException { IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get(); if (m_tranManager == null) { m_tranManager = new UAPTransactionManager(); tm_local.set(m_tranManager); } ((IUAPTransactionManager)m_tranManager).begin(transType); }
从当前的线程去除 事务管理器,没有就new一个新的放入线程,而后直接 调用了 begin。 这里取一次的缘由是 由于是子事务 若是以前有事务 他须要取以前的事务,而后用以前的事务管理器 begin,由于他要统一事务管理器就一个! 咱们继续:ui
public void begin(int transType) throws NotSupportedException, SystemException { switch(transType) { case 0: this.createTransaction(TransactionContextType.NULL); break; case 1: if (this.tranStack.isEmpty()) { this.createTransaction(TransactionContextType.SOURCE); } else { this.createTransaction(TransactionContextType.JOINED); } break; case 2: if (!this.tranStack.isEmpty()) { this.createTransaction(TransactionContextType.NULL); } else { this.createTransaction(TransactionContextType.JOINED); } break; case 3: this.createTransaction(TransactionContextType.SOURCE); break; case 4: if (this.tranStack.isEmpty()) { throw new SystemException(); } this.createTransaction(TransactionContextType.JOINED); break; case 5: if (!this.tranStack.isEmpty()) { throw new SystemException(); } this.createTransaction(TransactionContextType.NULL); break; case 6: case 7: case 8: case 9: case 10: default: throw new NotSupportedException("trans type error!"); case 11: this.createTransaction(TransactionContextType.JOINED); try { this.setCurInvokeSavePoint(); } catch (SQLException var3) { throw new NotSupportedException("savePoint error!"); } } }
这里根据事务的开启类型不一样 进入不一样的逻辑,通常是NULL(无事务),JOINED(新支持事务的接口方法被调用 事务嵌套),SOURCE(第一个新事物) 3种,咱们发现 最终事务管理仍是交给了 uap.mw.trans.UAPTransaction类,this
这个类里 有个关键的 uap.mw.ds.DBConnection 为Value的Map对象, 这个uap.mw.ds.DBConnection是NC对Jdbc最底层的 Connection对象的包裹对象:线程
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; import java.sql.Clob; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.NClob; import java.sql.PreparedStatement; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.SQLXML; import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.Map.Entry; import java.util.concurrent.Executor; import nc.bs.logging.Log; import nc.bs.logging.Logger; import nc.bs.mw.sql.validator.ValidateTool; import uap.mw.ds.exception.DBRunTimeException; import uap.mw.ds.monitor.ConnectionExeInfo; import uap.mw.trans.UAPTransactionManagerProxy; import uap.mw.trans.util.ConnectionStatus; public class DBConnection implements Connection { private static Log log = Log.getInstance(DBConnection.class); private volatile ConnectionStatus status; private Connection realConnection; private UAPDataSource ds; public Exception trace; public Thread lastInUseThread; private long activeTime; private long lastInUseTime; private ConnectionExeInfo lastRunTimeInfo; private ConnectionExeInfo curRunTimeInfo; private String connID; private int prepareStatementNum; private Map<String, UAPPreparedStatement> statementCached; private Map<Integer, UAPPreparedStatement> stmtTracksMap; private Connection getRealConnection() { if (this.isClosed()) { throw new DBRunTimeException(" connection is Closed"); } else { return this.realConnection; } } public Connection getPhysicalConnection() { return this.realConnection; } DBConnection(Connection realConnection, final UAPDataSource ds) { this.status = ConnectionStatus.FREE; this.realConnection = null; this.ds = null; this.activeTime = -1L; this.lastInUseTime = -1L; this.connID = ""; this.prepareStatementNum = 0; this.statementCached = null; this.stmtTracksMap = new HashMap(); this.realConnection = realConnection; this.ds = ds; this.connID = UUID.randomUUID().toString(); this.statementCached = new LinkedHashMap<String, UAPPreparedStatement>(ds.getPreparedStatementCacheSize(), 1.0F, true) { public UAPPreparedStatement remove(Object sql) { UAPPreparedStatement stmt = (UAPPreparedStatement)super.remove(sql); if (stmt != null) { stmt.isPooled = false; } return stmt; } protected boolean removeEldestEntry(Entry<String, UAPPreparedStatement> eldest) { boolean needRemove = this.size() > ds.getPreparedStatementCacheSize(); if (needRemove) { ((UAPPreparedStatement)eldest.getValue()).isPooled = false; if (!((UAPPreparedStatement)eldest.getValue()).inUse) { DBConnection.this.closeStmt((Statement)eldest.getValue()); } } return needRemove; } }; } public ConnectionStatus getStatus() { return this.status; } public void setAutoCommit(boolean b) throws SQLException { this.getRealConnection().setAutoCommit(b); } void setStatus(ConnectionStatus status) { this.status = status; } public void setLastUseMeThread(Thread newLastUseMeThread) { if (newLastUseMeThread == null) { this.trace = null; this.lastInUseTime = -1L; } else { this.trace = new Exception(); this.lastInUseTime = System.currentTimeMillis(); } this.lastInUseThread = newLastUseMeThread; } public void commit() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { this.realCommit(); } } public void realCommit() throws SQLException { this.getRealConnection().commit(); this.clearStatements(); } public void rollback() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { throw new SQLException("no Transaction ,can not ROLLBACK!"); } } public void rollback(Savepoint savepoint) throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { throw new SQLException("no Transaction ,can not ROLLBACK!"); } } public void realRollback() throws SQLException { this.getRealConnection().rollback(); } public void realRollback(Savepoint savepoint) throws SQLException { this.getRealConnection().rollback(savepoint); } public void close() throws SQLException { if (!UAPTransactionManagerProxy.isTrans()) { if (this.getStatus() != ConnectionStatus.INUSE) { throw new SQLException("Connection state Error:the connection state is " + this.getStatus()); } if (!this.getAutoCommit()) { this.commit(); } this.backPool(); } } public void backPool() throws SQLException { this.clearStatements(); this.ds.closeConnection(this); } public <T> T unwrap(Class<T> iface) throws SQLException { return this.getRealConnection().unwrap(iface); } public boolean isWrapperFor(Class<?> iface) throws SQLException { return this.getRealConnection().isWrapperFor(iface); } public Statement createStatement() throws SQLException { this.updateActivetime(); return new UAPStatement(this.getRealConnection().createStatement(), this.curRunTimeInfo); } public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return new UAPStatement(this.getRealConnection().createStatement(resultSetType, resultSetConcurrency), this.curRunTimeInfo); } private UAPPreparedStatement prepareCachedStatement(String sql) throws SQLException { UAPPreparedStatement ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, true, this.prepareStatementNum++); this.statementCached.put(sql, ps); return ps; } public PreparedStatement prepareStatement(String sql) throws SQLException { UAPPreparedStatement ps = null; if (this.ds.getPreparedStatementCacheSize() <= 0) { ps = new UAPPreparedStatement(this, this.getRealConnection().prepareStatement(sql), this.curRunTimeInfo, false, 0); } else { ps = (UAPPreparedStatement)this.statementCached.get(sql); if (ps == null) { ps = this.prepareCachedStatement(sql); } else if (ps.inUse) { this.statementCached.remove(sql); ps = this.prepareCachedStatement(sql); } else { try { ps.clearParameters(); } catch (SQLException var4) { this.statementCached.remove(sql); this.closeStmt(ps); ps = this.prepareCachedStatement(sql); } } } ps.inUse = true; this.stmtTracksMap.put(ps.getNum(), ps); return ps; } private void clearStatements() { UAPPreparedStatement s; try { for(Iterator i$ = this.stmtTracksMap.values().iterator(); i$.hasNext(); this.closeStmt(s)) { s = (UAPPreparedStatement)i$.next(); if (s.inUse) { Logger.warn("statement in use!"); } } } finally { this.prepareStatementNum = 0; this.stmtTracksMap.clear(); } } private void closeStmt(Statement stmt) { try { if (stmt != null) { stmt.close(); stmt = null; } } catch (SQLException var3) { } } public void removeStatementFromIndex(Integer statementNum) { this.stmtTracksMap.remove(statementNum); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql) throws SQLException { return this.getRealConnection().prepareCall(sql); } public String nativeSQL(String sql) throws SQLException { return this.getRealConnection().nativeSQL(sql); } public boolean getAutoCommit() throws SQLException { return this.getRealConnection().getAutoCommit(); } public boolean isClosed() { return this.status != ConnectionStatus.INUSE; } public DatabaseMetaData getMetaData() throws SQLException { return this.getRealConnection().getMetaData(); } public void setReadOnly(boolean readOnly) throws SQLException { this.getRealConnection().setReadOnly(readOnly); } public boolean isReadOnly() throws SQLException { return this.getRealConnection().isReadOnly(); } public void setCatalog(String catalog) throws SQLException { this.getRealConnection().setCatalog(catalog); } public String getCatalog() throws SQLException { return this.getRealConnection().getCatalog(); } public void setTransactionIsolation(int level) throws SQLException { this.getRealConnection().setTransactionIsolation(level); } public int getTransactionIsolation() throws SQLException { return this.getRealConnection().getTransactionIsolation(); } public SQLWarning getWarnings() throws SQLException { return this.getRealConnection().getWarnings(); } public void clearWarnings() throws SQLException { this.updateActivetime(); this.getRealConnection().clearWarnings(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { throw new UnsupportedOperationException(); } public Map<String, Class<?>> getTypeMap() throws SQLException { return this.getRealConnection().getTypeMap(); } public void setTypeMap(Map<String, Class<?>> map) throws SQLException { this.getRealConnection().setTypeMap(map); } public void setHoldability(int holdability) throws SQLException { this.getRealConnection().setHoldability(holdability); } public int getHoldability() throws SQLException { return this.getRealConnection().getHoldability(); } public Savepoint setSavepoint() throws SQLException { return this.getRealConnection().setSavepoint(); } public Savepoint setSavepoint(String name) throws SQLException { return this.getRealConnection().setSavepoint(name); } public void releaseSavepoint(Savepoint savepoint) throws SQLException { this.getRealConnection().releaseSavepoint(savepoint); this.updateActivetime(); } /** @deprecated */ @Deprecated public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { throw new UnsupportedOperationException(); } /** @deprecated */ @Deprecated public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { throw new UnsupportedOperationException(); } public Clob createClob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createClob(); } public Blob createBlob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createBlob(); } public NClob createNClob() throws SQLException { this.updateActivetime(); return this.getRealConnection().createNClob(); } public SQLXML createSQLXML() throws SQLException { this.updateActivetime(); return this.getRealConnection().createSQLXML(); } public boolean isValid(int timeout) throws SQLException { return this.getRealConnection().isValid(timeout); } public void setClientInfo(String name, String value) throws SQLClientInfoException { this.getRealConnection().setClientInfo(name, value); } public void setClientInfo(Properties properties) throws SQLClientInfoException { this.getRealConnection().setClientInfo(properties); } public Properties getClientInfo() throws SQLException { return this.getRealConnection().getClientInfo(); } public String getClientInfo(String name) throws SQLException { return this.getRealConnection().getClientInfo(name); } public Array createArrayOf(String typeName, Object[] elements) throws SQLException { this.updateActivetime(); return this.getRealConnection().createArrayOf(typeName, elements); } public Struct createStruct(String typeName, Object[] attributes) throws SQLException { this.updateActivetime(); return this.getRealConnection().createStruct(typeName, attributes); } public long getActiveTime() { return this.activeTime; } public void setActiveTime(long activeTime) { this.activeTime = activeTime; } private void updateActivetime() { this.activeTime = System.currentTimeMillis(); } public boolean validate() { boolean m_useAble = false; try { if (ValidateTool.validate(this, this.ds.getDatabaseType())) { m_useAble = true; } else { m_useAble = false; } } catch (Exception var3) { log.warn("This connection:\"" + this.realConnection + "\" is not usable, it will be removed!", var3); m_useAble = false; } return m_useAble; } public String getDataSourceName() { return this.ds.getDataSourceName(); } public long getLastInUseTime() { return this.lastInUseTime; } public void changeStates(ConnectionStatus newStatus) { StatusHandlerFactory.getStatusHandler(this.getStatus(), this).changeStatus(newStatus); } ConnectionExeInfo getLastConnectionExeInfo() { return this.lastRunTimeInfo; } ConnectionExeInfo getCurConnectionExeInfo() { return this.curRunTimeInfo; } void setLastRunTimeInfo(ConnectionExeInfo lastRunTimeInfo) { this.lastRunTimeInfo = lastRunTimeInfo; } void setCurRunTimeInfo(ConnectionExeInfo curRunTimeInfo) { this.curRunTimeInfo = curRunTimeInfo; if (curRunTimeInfo != null) { curRunTimeInfo.setConnID(this.connID); } } public String toString() { return this.ds.getDataSourceName() + "::" + this.connID; } public String getCurConnectionExeInfoInfo() { return this.getCurConnectionExeInfo() != null ? this.getCurConnectionExeInfo().getInfo() : null; } long getFreeTime() { return this.getStatus() != ConnectionStatus.FREE ? 0L : this.curRunTimeInfo.getFreeTime(); } void realDestroyConn() throws SQLException { this.getRealConnection().close(); } public void setSchema(String schema) throws SQLException { } public String getSchema() throws SQLException { return null; } public void abort(Executor executor) throws SQLException { } public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } public int getNetworkTimeout() throws SQLException { return 0; } }
能够看到里面 记录了真正的conn对象 重写了 commit等方法 逻辑 不进行真实提交, 同时记录了SPR的信息 好比sql记录:3d
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds.monitor; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Iterator; import java.util.LinkedList; import javax.transaction.Transaction; import nc.bs.logging.Logger; import uap.mw.ds.util.DSUtil; import uap.mw.trans.UAPTransaction; import uap.mw.trans.util.TransUtil; public class ConnectionExeInfo { private Thread curThread = null; private String transactionID; private long startTime = -1L; private long endTime = -1L; private LinkedList<SQLExeInfo> historySql = new ConnectionExeInfo.SQLExeList(); private String preSQL = ""; private boolean isClosed = false; private String connID = null; private Throwable connTrace = null; public ConnectionExeInfo() { this.connTrace = new Exception(); } public void startConn() { this.startTime = System.currentTimeMillis(); Transaction trans = TransUtil.getCurTransactionInfo(); if (trans instanceof UAPTransaction) { this.transactionID = ((UAPTransaction)trans).getKey(); } this.curThread = Thread.currentThread(); } public void startSQL(String sql, String[] params) { this.historySql.add(new SQLExeInfo(sql, params)); } public void endSQL(String sql, String[] params) { ((SQLExeInfo)this.historySql.peek()).endSql(sql); } public void preSQL(String sql) { this.preSQL = sql; } public void addBatch(String[] params) { ((SQLExeInfo)this.historySql.peek()).addBatch(params); } public void startPreSQL() { this.historySql.add(new SQLExeInfo(this.preSQL, true)); } public void endPreSQL() { ((SQLExeInfo)this.historySql.peek()).endSql(this.preSQL); } public void endConn() { this.endTime = System.currentTimeMillis(); this.isClosed = true; this.curThread = null; } public long getConnUseTime() { return this.endTime < 0L ? System.currentTimeMillis() - this.startTime : this.endTime - this.startTime; } public Thread getUseThread() { return this.curThread; } public String getTransactionID() { return this.transactionID; } public String getConnID() { return this.connID; } public void setConnID(String connID) { this.connID = connID; } public String getInfo() { StringBuffer info = new StringBuffer(""); info.append("connID::" + this.getConnID() + DSUtil.getBrTag()); info.append("curThread::" + this.getUseThread() + DSUtil.getBrTag()); Logger.error("", this.connTrace); info.append("ThreadDump::" + this.getStackTrace(this.connTrace)); info.append("transactionID::" + this.getTransactionID() + DSUtil.getBrTag()); info.append("isClosed::" + this.isClosed + DSUtil.getBrTag()); info.append("ConnUseTime::" + this.getConnUseTime() + DSUtil.getBrTag()); Iterator i$ = this.historySql.iterator(); while(i$.hasNext()) { SQLExeInfo sqlInfo = (SQLExeInfo)i$.next(); info.append(sqlInfo.toString()); } return info.toString(); } private String getStackTrace(Throwable e) { StringWriter sWriter = new StringWriter(); PrintWriter writer = new PrintWriter(sWriter); String var4; try { e.printStackTrace(writer); var4 = sWriter.toString(); } finally { writer.close(); } return var4; } public long getFreeTime() { return this.isClosed ? System.currentTimeMillis() - this.endTime : 0L; } public Throwable getConnTrace() { return this.connTrace; } public void setConnTrace(Throwable connTrace) { this.connTrace = connTrace; } class SQLExeList<T> extends LinkedList<T> { private static final long serialVersionUID = 1L; SQLExeList() { } public boolean add(T e) { if (this.size() > 30) { this.remove(); } return super.add(e); } } }
那么SPR如何记录SQL历史呢:代理
在NC最底层的JDBC查询实现 nc.jdbc.framework.JdbcSession#executeQuery(java.lang.String, nc.jdbc.framework.processor.ResultSetProcessor) 方法 咱们发现了一个关键调用:rs = statement.executeQuery(sql); 而其中 statement是uap.mw.ds.DBConnection#createStatement() 建立的: uap.mw.ds.UAPStatement对象
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package uap.mw.ds; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import uap.mw.ds.monitor.ConnectionExeInfo; public class UAPStatement extends UAPStatementSkelecton { private ConnectionExeInfo runTimeInfo = null; UAPStatement(Statement ps, ConnectionExeInfo runTimeInfo) { super(ps); this.runTimeInfo = runTimeInfo; } public int executeUpdate(String sql) throws SQLException { this.runTimeInfo.startSQL(sql, (String[])null); boolean var2 = false; int num; try { num = super.executeUpdate(sql); } finally { this.runTimeInfo.endSQL(sql, (String[])null); } return num; } public ResultSet executeQuery(String sql) throws SQLException { this.runTimeInfo.startSQL(sql, (String[])null); ResultSet result = null; try { result = super.executeQuery(sql); } finally { this.runTimeInfo.endSQL(sql, (String[])null); } return result; } }
就此 谜题解开, 这个类里 直接调用了 this.runTimeInfo.startSQL(sql, (String[])null); 方法记录了你的SQL,其余SQL同理。
咱们继续研究 事务开启的轨迹,接着上面 咱们发现 事务是调用了 this.tranStack.push(tranText); 方法,这个是一个 链表 存储的是 每个接口方法调用的事务上下文,一个被重写的内部类 咱们看看push方法:
class UAPTransactionManager implements IUAPTransactionManager, TransactionManager { private static Log log = Log.getInstance(IerpTransactionManager.class); private Thread curThread; private int m_nTranTimeOut; private int m_tranProp; private LinkedList<UAPTransactionContext> tranStack = new LinkedList<UAPTransactionContext>() { public UAPTransactionContext pop() { UAPTransactionContext curContext = (UAPTransactionContext)super.pop(); if (curContext.getTransType() == TransactionContextType.SOURCE || curContext.getTransType() == TransactionContextType.NULL) { UAPTransactionManager.this.resumeCurTrans(); } return curContext; } public void push(UAPTransactionContext item) { if (item.getTransType() == TransactionContextType.SOURCE || item.getTransType() == TransactionContextType.NULL) { UAPTransactionManager.this.suspendCurTrans(); } super.push(item); } };
UAPTransactionManager.this.suspendCurTrans(); 继续看。。。
private void suspendCurTrans() { if (!this.tranStack.isEmpty()) { UAPTransactionContext transContext = (UAPTransactionContext)this.tranStack.peek(); UAPTransaction uapTran = (UAPTransaction)transContext.getTransaction(); uapTran.suspendAllConnection(); } }
由于最后一个事务上下文是push的,因此是压入顶部,因此这里是peek 获取但不移除顶部第一个元素,也就是 后进先出原则。
拿到后 用最后一次事务上下文 暂停上下文持有的全部数据库连接。
public void suspendAllConnection() { Iterator i$ = this.getConnectionMap().values().iterator(); while(i$.hasNext()) { DBConnection conn = (DBConnection)i$.next(); conn.changeStates(ConnectionStatus.INUSE_SUSPEND); } }
其实就是拿到全部链接 进行状态记录下, 那么问题就是 这个连接集合他是如何维护的,只要知道了 他什么时候把db的连接对象放入这个map 咱们就知道了 他事务管理的实现原理,由于他最后无非就是 经过惟一的线程事务管理器 拿到全部事务上下文 根据事务上下文是否子事务 依次 循环里面的db连接集合(上面的DBConnect对象) 对他们集体进行 提交或回滚(调用 realCommit realRollback方法 而后触发调用 真正的jdbc conn对象的对应方法):
进过调试咱们发现 在 uap.mw.trans.UAPTransaction#enlistConnResource 这个方法提供了 当前事务上下文(栈顶的事务上下文)对db连接 存入到上面的db连接集合里, 触发方法调用在:
/** * 构造默认JdbcSession该JdbcSession会默认从当前访问的DataSource获得链接 */ public JdbcSession() throws DbException { try { Connection con = ConnectionFactory.getConnection(); dbType = DBUtil.getDbType(con); // dbType = DataSourceCenter.getInstance().getDatabaseType(); this.conn = con; } catch (SQLException e) { throw ExceptionFactory.getException(dbType, e.getMessage(), e); } }
获取连接调用的是 nc.jdbc.framework.DataSourceCenter 的 Connection dummy = DataSourceCenter.getInstance().getConnection();
走到了 uap.mw.ds.UAPDataSource#getTransConnection 方法 后调用
private DBConnection getTransConnection() throws SQLException { IUAPTransactionManager curTranManager = UAPTransactionManagerProxy.getCurTransManager(); UAPTransaction curTrans = (UAPTransaction)curTranManager.getTransaction(); DBConnection conn = curTrans.getConnectionFromCurTrans(this.dataSourceName); if (conn == null) { conn = this.connPool.getConnection(); if (conn != null) { this.setConnToUse(conn, true, curTrans, curTranManager); } } return conn; }
而后:
private void setConnToUse(DBConnection conn, boolean isTrans, UAPTransaction curTrans, IUAPTransactionManager curTranManager) throws SQLException { try { conn.changeStates(ConnectionStatus.INUSE); if (isTrans) { curTrans.enlistConnResource(this.dataSourceName, conn); conn.setAutoCommit(false); } } catch (Exception var6) { conn.setStatus(ConnectionStatus.NEED_DESTROY); conn.realDestroyConn(); this.connPool.releasePhore(); DBLogger.error("set Conn State error!", var6); throw new SQLException("set Conn State error!!"); } }
至此,咱们就已经了解了 整个NC事务的实现准备。 那么最后 就是 事务管理器的 commit和rollback的代码了:
public void uap.mw.trans.UAPTransactionManagerProxy#end(Exception ex) { IUAPTransactionManager m_tranManager = (IUAPTransactionManager)tm_local.get(); try { if (ex != null) { if (m_tranManager.getTranContext().needRBPoint()) { if (!((UAPTransaction)((UAPTransaction)m_tranManager.getTranContext().getTransaction())).getRollbackOnly()) { m_tranManager.rollBackToCurInvokePoint(); return; } } else { m_tranManager.setCurTransRollBack(); } } m_tranManager.commit(); } catch (Exception var4) { log.error("", var4); } }
而后根据是否有异常决定直接 commit 仍是 rollback到某个检查点 也就是 当前栈顶的新独立子事务!
public void uap.mw.trans.UAPTransactionManager#rollBackToCurInvokePoint() throws SQLException { UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop(); UAPTransaction trans = (UAPTransaction)this.getTransaction(); Map<String, Savepoint> savePointMap = curTransContext.getSavePointMap(); Map<String, DBConnection> connMap = trans.getConnectionMap(); Iterator i$ = connMap.keySet().iterator(); while(i$.hasNext()) { String dsName = (String)i$.next(); DBConnection conn = (DBConnection)connMap.get(dsName); Savepoint sPoint = (Savepoint)savePointMap.get(dsName); if (sPoint != null) { conn.realRollback(sPoint); } else { conn.realRollback(); } } }
public void setCurTransRollBack() throws IllegalStateException, SystemException { UAPTransactionContext context = this.getTranContext(); Transaction trans = context.getTransaction(); if (trans != null) { trans.setRollbackOnly(); } }
最终的提交实现:
public void uap.mw.trans.UAPTransactionManager#commit() throws RollbackException, IllegalStateException, SystemException, HeuristicRollbackException, SecurityException, HeuristicMixedException { UAPTransactionContext curTransContext = (UAPTransactionContext)this.tranStack.pop(); if (curTransContext != null && !curTransContext.isNullTrans() && !curTransContext.isJoined()) { try { Transaction curTran = curTransContext.getTransaction(); List<Synchronization> synList = ((UAPTransaction)curTran).getSynchronization(); Iterator i$; Synchronization s; if (synList != null && synList.size() > 0) { i$ = synList.iterator(); while(i$.hasNext()) { s = (Synchronization)i$.next(); s.beforeCompletion(); } } try { curTran.commit(); } catch (Exception var9) { log.error("commit error", var9); } if (synList != null && synList.size() > 0) { i$ = synList.iterator(); while(i$.hasNext()) { s = (Synchronization)i$.next(); s.afterCompletion(curTran.getStatus()); } } } finally { if (this.tranStack.isEmpty()) { this.removeCurTransManager(); } } } }
public void uap.mw.trans.UAPTransaction#commit() throws SecurityException, SystemException, RollbackException, HeuristicMixedException, HeuristicRollbackException { if (this.m_bRollbackOnly) { this.rollback(); } else { this.m_status = 8; boolean hasException = false; for(Iterator i$ = this.connecionMap.values().iterator(); i$.hasNext(); this.m_status = 3) { DBConnection conn = (DBConnection)i$.next(); try { conn.realCommit(); } catch (Exception var13) { DBLogger.error(var13.getMessage() + ":" + conn.getDataSourceName(), var13); hasException = true; } finally { try { conn.backPool(); } catch (SQLException var12) { DBLogger.error(var12.getMessage(), var12); } } } this.connecionMap.clear(); this.transEnd(); if (hasException) { throw new SystemException("commit error,Please check log"); } } }