源码分析Mybatis系列目录:
一、源码分析Mybatis MapperProxy初始化【图文并茂】
二、源码分析Mybatis MappedStatement的建立流程
三、【图文并茂】Mybatis执行SQL的4大基础组件详解java若是图不清晰的话,能够查看CSDN博客连接:http://www.javashuo.com/article/p-rehfkmue-mh.htmlspring
本文将详细介绍Mybatis SQL语句执行的全流程,本文与上篇具备必定的关联性,建议先阅读该系列中的前面3篇文章,重点掌握Mybatis Mapper类的初始化过程,由于在Mybatis中,Mapper是执行SQL语句的入口,相似下面这段代码:sql
1@Service 2public UserService implements IUserService { 3 @Autowired 4 private UserMapper userMapper; 5 public User findUser(Integer id) { 6 return userMapper.find(id); 7 } 8}
开始进入本文的主题,以源码为手段,分析Mybatis执行SQL语句的流行,而且使用了数据库分库分表中间件sharding-jdbc,其版本为sharding-jdbc1.4.1。数据库
为了方便你们对本文的源码分析,先给出Mybatis层面核心类的方法调用序列图。缓存
接下来从从源码的角度对其进行剖析。微信
舒适提示:在本文的末尾,还会给出一张详细的Mybatis Shardingjdbc语句执行流程图。(请勿错过哦)。mybatis
1public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 2 if (Object.class.equals(method.getDeclaringClass())) { 3 try { 4 return method.invoke(this, args); 5 } catch (Throwable t) { 6 throw ExceptionUtil.unwrapThrowable(t); 7 } 8 } 9 final MapperMethod mapperMethod = cachedMapperMethod(method); // @1 10 return mapperMethod.execute(sqlSession, args); // @2 11 }
代码@1:建立并缓存MapperMethod对象。app
代码@2:调用MapperMethod对象的execute方法,即mapperInterface中定义的每个方法最终会对应一个MapperMethod。框架
1public Object execute(SqlSession sqlSession, Object[] args) { 2 Object result; 3 if (SqlCommandType.INSERT == command.getType()) { 4 Object param = method.convertArgsToSqlCommandParam(args); 5 result = rowCountResult(sqlSession.insert(command.getName(), param)); 6 } else if (SqlCommandType.UPDATE == command.getType()) { 7 Object param = method.convertArgsToSqlCommandParam(args); 8 result = rowCountResult(sqlSession.update(command.getName(), param)); 9 } else if (SqlCommandType.DELETE == command.getType()) { 10 Object param = method.convertArgsToSqlCommandParam(args); 11 result = rowCountResult(sqlSession.delete(command.getName(), param)); 12 } else if (SqlCommandType.SELECT == command.getType()) { 13 if (method.returnsVoid() && method.hasResultHandler()) { 14 executeWithResultHandler(sqlSession, args); 15 result = null; 16 } else if (method.returnsMany()) { 17 result = executeForMany(sqlSession, args); 18 } else if (method.returnsMap()) { 19 result = executeForMap(sqlSession, args); 20 } else { 21 Object param = method.convertArgsToSqlCommandParam(args); 22 result = sqlSession.selectOne(command.getName(), param); 23 } 24 } else { 25 throw new BindingException("Unknown execution method for: " + command.getName()); 26 } 27 if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) { 28 throw new BindingException("Mapper method '" + command.getName() 29 + " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ")."); 30 } 31 return result; 32 }
该方法主要是根据SQL类型,insert、update、select等操做,执行对应的逻辑,本文咱们以查询语句,进行跟踪,进入executeForMany(sqlSession, args)方法。运维
1private <E> Object executeForMany(SqlSession sqlSession, Object[] args) { 2 List<E> result; 3 Object param = method.convertArgsToSqlCommandParam(args); 4 if (method.hasRowBounds()) { 5 RowBounds rowBounds = method.extractRowBounds(args); 6 result = sqlSession.<E>selectList(command.getName(), param, rowBounds); 7 } else { 8 result = sqlSession.<E>selectList(command.getName(), param); 9 } 10 // issue #510 Collections & arrays support 11 if (!method.getReturnType().isAssignableFrom(result.getClass())) { 12 if (method.getReturnType().isArray()) { 13 return convertToArray(result); 14 } else { 15 return convertToDeclaredCollection(sqlSession.getConfiguration(), result); 16 } 17 } 18 return result; 19 }
该方法也比较简单,最终经过SqlSession调用selectList方法。
1public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { 2 try { 3 MappedStatement ms = configuration.getMappedStatement(statement); // @1 4 List<E> result = executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); // @2 5 return result; 6 } catch (Exception e) { 7 throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); 8 } finally { 9 ErrorContext.instance().reset(); 10 } 11 }
代码@1:根据资源名称获取对应的MappedStatement对象,此时的statement为资源名称,例如com.demo.UserMapper.findUser。至于MappedStatement对象的生成在上一节初始化时已详细介绍过,此处再也不重复介绍。
代码@2:调用Executor的query方法。这里说明一下,其实一开始会进入到CachingExecutor#query方法,因为CachingExecutor的Executor delegate属性默认是SimpleExecutor,故最终仍是会进入到SimpleExecutor#query中。
接下来咱们进入到SimpleExecutor的父类BaseExecutor的query方法中。
1public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { // @1 2 ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); 3 if (closed) throw new ExecutorException("Executor was closed."); 4 if (queryStack == 0 && ms.isFlushCacheRequired()) { 5 clearLocalCache(); 6 } 7 List<E> list; 8 try { 9 queryStack++; 10 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null; // @2 11 if (list != null) { 12 handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); 13 } else { 14 list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); // @3 15 } 16 } finally { 17 queryStack--; 18 } 19 if (queryStack == 0) { 20 for (DeferredLoad deferredLoad : deferredLoads) { 21 deferredLoad.load(); 22 } 23 deferredLoads.clear(); // issue #601 24 if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) { // @4 25 clearLocalCache(); // issue #482 26 } 27 } 28 return list; 29 }
代码@1:首先介绍一下该方法的入参,这些类都是Mybatis的重要类:
代码@3:从数据库查询结果,而后进入到doQuery方法,执行真正的查询动做。
代码@4:若是一级缓存是语句级别的,则语句执行完毕后,删除缓存。
1public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { 2 Statement stmt = null; 3 try { 4 Configuration configuration = ms.getConfiguration(); 5 StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); // @1 6 stmt = prepareStatement(handler, ms.getStatementLog()); // @2 7 return handler.<E>query(stmt, resultHandler); // @3 8 } finally { 9 closeStatement(stmt); 10 } 11 }
代码@1:建立StatementHandler,这里会加入Mybatis的插件扩展机制(将在下篇详细介绍),如图所示:
在这里插入图片描述
代码@2:建立Statement对象,注意,这里就是JDBC协议的java.sql.Statement对象了。
代码@3:使用Statment对象执行SQL语句。
接下来详细介绍Statement对象的建立过程与执行过程,即分布详细跟踪代码@2与代码@3。
1private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { 2 Statement stmt; 3 Connection connection = getConnection(statementLog); // @1 4 stmt = handler.prepare(connection); // @2 5 handler.parameterize(stmt); // @3 6 return stmt; 7}
建立Statement对象,分红三步:
代码@1:建立java.sql.Connection对象。
代码@2:使用Connection对象建立Statment对象。
代码@3:对Statement进行额外处理,特别是PrepareStatement的参数设置(ParameterHandler)。
getConnection方法,根据上面流程图所示,先是进入到SpringManagedTransaction,再经过spring-jdbc框架,利用DataSourceUtils获取链接,其代码以下:
1org.mybatis.spring.transaction.SpringManagedTransaction#doGetConnection 2public static Connection doGetConnection(DataSource dataSource) throws SQLException { 3 Assert.notNull(dataSource, "No DataSource specified"); 4 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); 5 if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { 6 conHolder.requested(); 7 if (!conHolder.hasConnection()) { 8 conHolder.setConnection(dataSource.getConnection()); 9 } 10 return conHolder.getConnection(); 11 } 12 // Else we either got no holder or an empty thread-bound holder here. 13 14 logger.debug("Fetching JDBC Connection from DataSource"); 15 Connection con = dataSource.getConnection(); // @1 16 17 // 这里省略与事务处理相关的代码 18 return con; 19 }
代码@1:经过DataSource获取connection,那此处的DataSource是“谁”呢?看一下咱们工程的配置:
故最终dataSouce.getConnection获取的链接,是从SpringShardingDataSource中获取链接。
1com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource#getConnection 2public ShardingConnection getConnection() throws SQLException { 3 MetricsContext.init(shardingProperties); 4 return new ShardingConnection(shardingContext); 5}
返回的结果以下:
备注:这里只是返回了一个ShardingConnection对象,该对象包含了分库分表上下文,但此时并无执行具体的分库操做(切换数据源)。
Connection的获取流程清楚后,咱们继续来看一下Statemnet对象的建立。
1stmt = prepareStatement(handler, ms.getStatementLog());
上面语句的调用链:RoutingStatementHandler -》BaseStatementHand
3public Statement prepare(Connection connection) throws SQLException { 4 ErrorContext.instance().sql(boundSql.getSql()); 5 Statement statement = null; 6 try { 7 statement = instantiateStatement(connection); // @1 8 setStatementTimeout(statement); // @2 9 setFetchSize(statement); // @3 10 return statement; 11 } catch (SQLException e) { 12 closeStatement(statement); 13 throw e; 14 } catch (Exception e) { 15 closeStatement(statement); 16 throw new ExecutorException("Error preparing statement. Cause: " + e, e); 17 } 18 }
代码@1:根据Connection对象(本文中是ShardingConnection)来建立Statement对象,其默认实现类:PreparedStatementHandler#instantiateStatement方法。
代码@2:为Statement设置超时时间。
代码@3:设置fetchSize。
1PreparedStatementHandler#instantiateStatement 2protected Statement instantiateStatement(Connection connection) throws SQLException { 3 String sql = boundSql.getSql(); 4 if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) { 5 String[] keyColumnNames = mappedStatement.getKeyColumns(); 6 if (keyColumnNames == null) { 7 return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS); 8 } else { 9 return connection.prepareStatement(sql, keyColumnNames); 10 } 11 } else if (mappedStatement.getResultSetType() != null) { 12 return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY); 13 } else { 14 return connection.prepareStatement(sql); 15 } 16 }
其实Statement对象的建立,就比较简单了,既然Connection是ShardingConnection,那就看一下其对应的prepareStatement方法便可。
1 3public PreparedStatement prepareStatement(final String sql) throws SQLException { // sql,为配置在mybatis xml文件中的sql语句 4 return new ShardingPreparedStatement(this, sql); 5} 6ShardingPreparedStatement(final ShardingConnection shardingConnection, 7 final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { 8 super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); 9 preparedSQLRouter = shardingConnection.getShardingContext().getSqlRouteEngine().prepareSQL(sql); 10}
在构建ShardingPreparedStatement对象的时候,会根据SQL语句建立解析SQL路由的解析器对象,但此时并不会执行相关的路由计算,PreparedStatement对象建立完成后,就开始进入SQL执行流程中。
SQL执行流程
接下来咱们继续看SimpleExecutor#doQuery方法的第3步,执行SQL语句:
1handler.<E>query(stmt, resultHandler)。
首先会进入RoutingStatementHandler这个类中,进行Mybatis层面的路由(主要是根据Statement类型)
而后进入到PreparedStatementHandler#query中。
3public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException { 4 PreparedStatement ps = (PreparedStatement) statement; 5 ps.execute(); // @1 6 return resultSetHandler.<E> handleResultSets(ps); // @2 7}
代码@2:处理结果。
咱们接下来分别来跟进execute与结果处理方法。
ShardingPreparedStatement#execute
2public boolean execute() throws SQLException { 3 try { 4 return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute(); // @1 5 } finally { 6 clearRouteContext(); 7 } 8}
这里奥妙无穷,其关键点以下:
1)创造PreparedStatementExecutor对象,其两个核心参数:
3)最终调用PreparedStatementExecutor方法的execute来执行。
接下来分别看一下routeSQL与execute方法。
3private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException { 4 List<PreparedStatementExecutorWrapper> result = new ArrayList<>(); 5 SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters()); // @1 6 MergeContext mergeContext = sqlRouteResult.getMergeContext(); 7 setMergeContext(mergeContext); 8 setGeneratedKeyContext(sqlRouteResult.getGeneratedKeyContext()); 9 for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { // @2 10 PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql()); // @3 11 replayMethodsInvocation(preparedStatement); 12 getParameters().replayMethodsInvocation(preparedStatement); 13 result.add(wrap(preparedStatement, each)); 14 } 15 return result; 16}
代码@1:根据SQL参数进行路由计算,本文暂不关注其具体实现细节,这些将在具体分析Sharding-jdbc时具体详解,在这里就直观看一下其结果:
代码@二、@3:对分库分表的结果进行遍历,而后使用底层Datasource来建立Connection,建立PreparedStatement 对象。
routeSQL就暂时讲到这,从这里咱们得知,会在这里根据路由结果,使用底层的具体数据源建立对应的Connection与PreparedStatement 对象。
1 3public boolean execute() { 4 Context context = MetricsContext.start("ShardingPreparedStatement-execute"); 5 eventPostman.postExecutionEvents(); 6 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); 7 final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); 8 try { 9 if (1 == preparedStatementExecutorWrappers.size()) { // @1 10 PreparedStatementExecutorWrapper preparedStatementExecutorWrapper = preparedStatementExecutorWrappers.iterator().next(); 11 return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, dataMap); 12 } 13 List<Boolean> result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Boolean>() { // @2 14 15 @Override 16 public Boolean execute(final PreparedStatementExecutorWrapper input) throws Exception { 17 synchronized (input.getPreparedStatement().getConnection()) { 18 return executeInternal(input, isExceptionThrown, dataMap); 19 } 20 } 21 }); 22 return (null == result || result.isEmpty()) ? false : result.get(0); 23 } finally { 24 MetricsContext.stop(context); 25 } 26 }
代码@1:若是计算出来的路由信息为1个,则同步执行。
代码@2:若是计算出来的路由信息有多个,则使用线程池异步执行。
那还有一个问题,经过PreparedStatement#execute方法执行后,如何返回结果呢?特别是异步执行的。
在上文其实已经谈到:
1 3public List<Object> handleResultSets(Statement stmt) throws SQLException { 4 ErrorContext.instance().activity("handling results").object(mappedStatement.getId()); 5 6 final List<Object> multipleResults = new ArrayList<Object>(); 7 8 int resultSetCount = 0; 9 ResultSetWrapper rsw = getFirstResultSet(stmt); // @1 10 //省略部分代码,完整代码能够查看DefaultResultSetHandler方法。 11 return collapseSingleResultList(multipleResults); 12 } 13 14private ResultSetWrapper getFirstResultSet(Statement stmt) throws SQLException { 15 ResultSet rs = stmt.getResultSet(); // @2 16 while (rs == null) { 17 // move forward to get the first resultset in case the driver 18 // doesn't return the resultset as the first result (HSQLDB 2.1) 19 if (stmt.getMoreResults()) { 20 rs = stmt.getResultSet(); 21 } else { 22 if (stmt.getUpdateCount() == -1) { 23 // no more results. Must be no resultset 24 break; 25 } 26 } 27 } 28 return rs != null ? new ResultSetWrapper(rs, configuration) : null; 29 }
咱们看一下其关键代码以下:
代码@1:调用Statement#getResultSet()方法,若是使用shardingJdbc,则会调用ShardingStatement#getResultSet(),并会处理分库分表结果集的合并,在这里就不详细进行介绍,该部分会在shardingjdbc专栏详细分析。
代码@2:jdbc statement中获取结果集的通用写法,这里也不过多的介绍。
mybatis shardingjdbc SQL执行流程就介绍到这里了,为了方便你们对上述流程的理解,最后给出SQL执行的流程图:
Mybatis Sharding-Jdbc的SQL执行流程就介绍到这里了,从图中也能清晰看到Mybatis的拆件机制,将在下文详细介绍。
查看更多文章请关注微信公众号:
一波广告来袭,做者新书《RocketMQ技术内幕》已出版上市:
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎全部的配置参数。本书获得了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度承认并做序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。