直接入正题,如下用数据库操做举例。java
查看ArrayList.stream()方法的实现,如图:mysql
发现方法实在Collection接口里面实现的,能够看出Stream对象是由StreamSupport.stream()方法初始化的。第一个入参是Spliterator实例,第二个入参,true表示执行的是并行操做,反之。查看api能够发现StreamSupport.Stream()方法还有另一个以Suplier Function为数据源的重载方法,这里不举例。sql
从新定位上图spliterator()方法,如图:数据库
查看this的类型:api
能够看到Spliterators.spliterator()方法的第一个入参是Iterator实例,第二个入参看文档貌似是用于适当分割数据源,增长批处理大小(batch size)的,填0的话,内部会替换为默认的数值,不然取自定义的值,这里不深究,固然也但愿有大牛不吝解惑。ide
好了,到这里就能够实现本身的Stream api了;测试
直接上码:this
1. 建立实现Iterator接口:spa
package com.qkf.test.iterator; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import com.qkf.test.bean.Record; /** * 用于构建Stream的Source * @author qkf */ public class ResultSetIterator implements Iterator<Record> { private Connection conn; private ResultSet rs; private PreparedStatement ps; public ResultSetIterator(Connection conn, String sql, Object... params) { assert conn != null; assert sql != null; this.conn = conn; try { ps = conn.prepareStatement(sql); if (params != null && params.length > 0) { for (int i = 1; i <= params.length; ++i) { ps.setObject(i, params[i-1]); } } rs = ps.executeQuery(); } catch (SQLException e) { closeRes(); e.printStackTrace(); } } @Override public boolean hasNext() { try { return rs.next(); } catch (SQLException e) { closeRes(); e.printStackTrace(); } return false; } @Override public Record next() { return new Record(rs); } private void close(AutoCloseable... closeable) { if (closeable != null) { for (AutoCloseable c : closeable) { try { c.close(); } catch (Exception e) { // nothing to do } } } } /** * 关闭资源 */ private void closeRes() { close(this.rs, this.ps, this.conn); } }
以上用到的Record类表示数据库的一行记录,上码:code
package com.qkf.test.bean; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 表示数据行记录 * @author qkf */ public class Record { private Map<String, Object> columnMap = null; // 行记录保存为key-value形式 private List<Object> columnList = null; // 行记录保存为列表形式 private int columnCount; // 行记录列数 public Record(ResultSet rs) { columnMap = new HashMap<>(); columnList = new ArrayList<>(); try { ResultSetMetaData metaData = rs.getMetaData(); columnCount = metaData.getColumnCount(); if (columnCount > 0) { for (int i = 1; i <= columnCount; ++i) { Object obj = rs.getObject(i); columnList.add(obj); columnMap.put(metaData.getColumnLabel(i), obj); } } } catch (SQLException e) { e.printStackTrace(); } } public Integer getAsInt(int index) { return Integer.valueOf(columnList.get(index).toString()); } public Integer getAsInt(String name) { return Integer.valueOf(columnMap.get(name).toString()); } public Double getAsDouble(int index) { return Double.valueOf(columnList.get(index).toString()); } public Double getAsDouble(String name) { return Double.valueOf(columnMap.get(name).toString()); } public String getAsString(int index) { return String.valueOf(columnList.get(index).toString()); } public String getAsString(String name) { return String.valueOf(columnMap.get(name).toString()); } public int getColumnSize() { return this.columnCount; } @Override public String toString() { return columnMap.toString(); } }
2.根据开头阅读源码获得的Api自定义Stream api:
package com.qkf.test; import java.sql.Connection; import java.sql.DriverManager; import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; import com.qkf.test.bean.Record; import com.qkf.test.iterator.ResultSetIterator; /** * 数据库操做Stream api */ public class Records { public static Stream<Record> stream(String sql, Object... params) { try { // 注意: 这里使用try-with-resource处理Connection的话,数据库链接会提早关闭而致使如下读取数据库的操做抛异常 Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/users?useUnicode=true&characterEncoding=utf8", "qkf", "Fengqik@5811"); return StreamSupport.stream(Spliterators.spliteratorUnknownSize( new ResultSetIterator(conn, sql, params), 0), false); } catch(Throwable e) { System.err.println("Connection refused!"); return Stream.empty(); } } }
OK, 以上就是本篇的所有代码,下面编写测试代码:
测试数据:
测试代码:
package com.qkf.test; /** * test */ public class StreamTest { public static void before() { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch(Exception e) {} } public static void main(String[] args) { before(); String sql = "select * from users where age <= ?"; Records.stream( sql, 24 // sql params ).peek(r -> { System.out.println("Record: " + r); }).filter(r -> "男".equals(r.getAsString("sex")) ).forEach(r -> System.out.println( String.format("hit: name: %s, age: %d", r.getAsString("name"), r.getAsInt(2))) ); } }
运行结果:
运行成功。
总结一下:建立Stream实例由StreamSupport.stream(Spliterator实例, 是否并行); 而Spliterator实例能够经过Spliterators工厂类构造获得,本例使用的工厂方法入参是(自定义的Iterator类的实例, 特征(整形, characteristics));
大概就这样。
若有错误和不足,望斧正。
晚了,睡觉。谢谢阅读。