JAVAjava
前面介绍了裸露JDBC 方式使用流式编程,下面介绍下MYbatis中两种使用流式查询方法mysql
<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"> <property name="sqlSessionFactory" ref="sqlSessionFactory" /> <property name="queryId" value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" /> </bean>
其中queryId为mapper文件中接口名称。spring
sql
image.png数据库
其中fetchSize=”-2147483648″,Integer.MIN_VALUE=-2147483648编程
static void testCursor1() throws UnexpectedInputException, ParseException, Exception { try { Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); accsDeviceInfoDAOExample.createCriteria().andAppKeyEqualTo("12345").andAppVersionEqualTo("5.7.2.4.5") .andPackageNameEqualTo("com.test.zlx"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); // 设置参数 myMyBatisCursorItemReader.setParameterValues(param); // 建立游标 myMyBatisCursorItemReader.open(new ExecutionContext()); //使用游标迭代获取每一个记录 Long count = 0L; AccsDeviceInfoDAO accsDeviceInfoDAO; while ((accsDeviceInfoDAO = myMyBatisCursorItemReader.read()) != null) { System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); ++count; System.out.println(count); } } catch (Exception e) { System.out.println("error:" + e.getLocalizedMessage()); } finally { // do some myMyBatisCursorItemReader.close(); } }
做用从session工厂获取一个session,而后调用session的selectCursor,它最终会调用缓存
ConnectionImpl的prepareStatement方法:服务器
public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException { return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE, DEFAULT_RESULT_SET_CONCURRENCY); } private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY; private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;
至此三个条件知足了两个,在加上咱们本身设置的fetchSize就通知mysql要建立流式ResultSet。
那么fectchsize何处设置那?网络
image.pngsession
图中1建立prepareStatement,2设置fetchSize.
设置后最后会调用MysqlIO的sqlQueryDirect方法执行具体sql并把结果resultset存放到JDBC4PrepardStatement中。
read函数做用是从结果集resultset中获取数据,首先调用.next判断是否有数据,有的话则读取数据。
这和纯粹JDBC编程方式就同样了,只是read函数对其进行了包装。

image.png
其中fetchSize=”-2147483648″,Integer.MIN_VALUE=-2147483648
static void testCursor2() { SqlSession session = sqlSessionFactory.openSession(); Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); accsDeviceInfoDAOExample.createCriteria().andAppKeyEqualTo("12345").andAppVersionEqualTo("1.2.3.4") .andPackageNameEqualTo("com.hello.test"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); session.select("com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData", param, new ResultHandler() { @Override public void handleResult(ResultContext resultContext) { AccsDeviceInfoDAO accsDeviceInfoDAO = (AccsDeviceInfoDAO) resultContext.getResultObject(); System.out.println(resultContext.getResultCount()); System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); } }); }
相似第三节,只是第三节返回了操做ResultSet的游标让用户本身迭代获取数据,而如今是内部直接操做ResultSet逐条获取数据并调用回调handler的handleResult方法进行处理。
private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException { DefaultResultContext<Object> resultContext = new DefaultResultContext<Object>(); skipRows(rsw.getResultSet(), rowBounds); while (shouldProcessMoreRows(resultContext, rowBounds) && rsw.getResultSet().next()) { ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(rsw.getResultSet(), resultMap, null); Object rowValue = getRowValue(rsw, discriminatedResultMap); storeObject(resultHandler, resultContext, rowValue, parentMapping, rsw.getResultSet()); } } private void storeObject(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue, ResultMapping parentMapping, ResultSet rs) throws SQLException { if (parentMapping != null) { linkToParents(rs, parentMapping, rowValue); } else { callResultHandler(resultHandler, resultContext, rowValue); } } //调用回调 @SuppressWarnings("unchecked" /* because ResultHandler<?> is always ResultHandler<Object>*/) private void callResultHandler(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) { resultContext.nextResultObject(rowValue); ((ResultHandler<Object>) resultHandler).handleResult(resultContext); }
流式编程使用裸露JDBC编程最简单,灵活,可是sql语句须要分散写到须要调用数据库操做的地方,不便于维护,Mybatis底层仍是使用裸露JDBC编程API实现的,而且使用xml文件统一管理sql语句,虽然解析执行时候会有点开销(好比每次调用都是反射进行的),可是同时还提供了缓存。
对于同等条件下搜索结果为600万条记录的时候使用游标与不使用时候内存占用对比:
image.png
粘贴图片.png
可知非流式时候内存会随着搜出来的记录增加而近乎直线增加,流式时候则比较平稳,另外非流式因为须要mysql服务器准备所有数据,因此调用后不会立刻返回,须要根据数据量大小不一样会等待一段时候才会返回,这时候调用方线程会阻塞,流式则由于每次返回一条记录,因此返回速度会很快。
裸露JDBC流式使用参考: http://www.jianshu.com/p/c1e6eeb71c74
这里在总结下:client发送select请求给Server后,Server根据条件筛选符合条件的记录,而后就会把记录发送到本身的发送buffer,等buffer满了就flush缓存(这里要注意的是若是client的接受缓存满了,那么Server的发送就会阻塞主,直到client的接受缓存空闲。),经过网络发送到client的接受缓存,当不用游标时候MySqIo就会从接受缓存里面逐个读取记录到resultset。就这样client 从本身的接受缓存读取数据到resultset,同时Server端不断经过网络向client接受缓存发送数据,直到全部记录都放到了resultset。
若是使用了游标,则用户调用resultset的next的频率决定了Server发送时候的阻塞状况,若是用户调用next快,那么client的接受缓存就会有空闲,那么Server就会把数据发送过来,若是用户调用的慢,那么因为接受缓存腾不出来,Server的发送就会阻塞