Spring Batch(6)——数据库批数据读写

前序文章陆续介绍了批处理的基本概念Job使用Step控制Item的结构以及扁平文件的读写。本文将接着前面的内容说明数据库如何进行批处理读写。git

数据读取

数据库是绝大部分系统要用到的数据存储工具,所以针对数据库执行批量数据处理任务也是很常见的需求。数据的批量处理与常规业务开发不一样,若是一次性读取百万条,对于任何系统而言确定都是不可取的。为了解决这个问题Spring Batch提供了2套数据读取方案:github

  • 基于游标读取数据
  • 基于分页读取数据

游标读取数据

对于有经验大数据工程师而言数据库游标的操做应该是很是熟悉的,由于这是从数据库读取数据流标准方法,并且在Java中也封装了ResultSet这种面向游标操做的数据结构。spring

ResultSet一直都会指向结果集中的某一行数据,使用next方法可让游标跳转到下一行数据。Spring Batch一样使用这个特性来控制数据的读取:数据库

  1. 在初始化时打开游标。
  2. 每一次调用ItemReader::read方法就从ResultSet获取一行数据并执行next
  3. 返回可用于数据处理的映射结构(map、dict)。

在一切都执行完毕以后,框架会使用回调过程调用ResultSet::close来关闭游标。因为全部的业务过程都绑定在一个事物之上,因此知道到Step执行完毕或异常退出调用执行close。下图展现了数据读取的过程:缓存

SQL语句的查询结果称为数据集(对于大部分数据库而言,其SQL执行结果会产生临时的表空间索引来存放数据集)。游标开始会停滞在ID=2的位置,一次ItemReader执行完毕后会产生对应的实体FOO2,而后游标下移直到最后的ID=6。最后关闭游标。session

JdbcCursorItemReader

JdbcCursorItemReader是使用游标读取数据集的ItemReader实现类之一。它使用JdbcTemplate中的DataSource控制ResultSet,其过程是将ResultSet的每行数据转换为所须要的实体类。数据结构

JdbcCursorItemReader的执行过程有三步:app

  1. 经过DataSource建立JdbcTemplate
  2. 设定数据集的SQL语句。
  3. 建立ResultSet到实体类的映射。 大体以下:
//随风溜达的向日葵 chkui.com
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());

除了上面的代码,JdbcCursorItemReader还有其余属性:框架

属性名称 说明
ignoreWarnings 标记当执行SQL语句出现警告时,是输出日志仍是抛出异常,默认为true——输出日志
fetchSize 预通知JDBC驱动全量数据的个数
maxRows 设置ResultSet从数据库中一次读取记录的上限
queryTimeout 设置执行SQL语句的等待超时时间,单位秒。当超过这个时间会抛出DataAccessException
verifyCursorPosition 对游标位置进行校验。因为在RowMapper::mapRow方法中ResultSet是直接暴露给使用者的,所以有可能在业务代码层面调用了ResultSet::next方法。将这个属性设置为true,在框架中会有一个位置计数器与ResultSet保持一致,当执行完Reader后位置不一致会抛出异常。
saveState 标记读取的状态是否被存放到ExecutionContext中。默认为true
driverSupportsAbsolute 告诉框架是指直接使用ResultSet::absolute方法来指定游标位置,使用这个属性须要数据库驱动支持。建议在支持absolute特性的数据库上开启这个特性,可以明显的提高性能。默认为false
setUseSharedExtendedConnection 标记读取数据的游标是否与Step其余过程绑定成同一个事物。默认为false,表示读取数据的游标是单独创建链接的,具备自身独立的事物。若是设定为true须要用ExtendedConnectionDataSourceProxy包装DataSource用于管理事物过程。此时游标的建立标记为'READ_ONLY'、'HOLD_CURSORS_OVER_COMMIT'。须要注意的是该属性须要数据库支持3.0以上的JDBC驱动。

可执行源码

源码在下列地址的items子项目:less

执行JdbcCursorItemReader的代码在org.chenkui.spring.batch.sample.items.JdbcReader。启动位置是org.chenkui.spring.batch.sample.database.cursor.JdbcCurosrApplication

在运行代码以前请先在数据库中执行如下DDL语句,并添加部分测试数据。

CREATE TABLE `tmp_test_weather` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `siteid` varchar(64) NOT NULL COMMENT '业务主键',
  `month` varchar(64) NOT NULL COMMENT '日期',
  `type` varchar(64) NOT NULL COMMENT '气象类型',
  `value` int(11) NOT NULL COMMENT '值',
  `ext` varchar(255) DEFAULT NULL COMMENT '扩展数据',
  PRIMARY KEY (`id`)
) ;

运行代码:

//随风溜达的向日葵 chkui.com
public class JdbcReader {

    @Bean
    public RowMapper<WeatherEntity> weatherEntityRowMapper() {

        return new RowMapper<WeatherEntity>() {
            public static final String SITEID_COLUMN = "siteId"; // 设置映射字段
            public static final String MONTH_COLUMN = "month";
            public static final String TYPE_COLUMN = "type";
            public static final String VALUE_COLUMN = "value";
            public static final String EXT_COLUMN = "ext";

            @Override
            // 数据转换
            public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                WeatherEntity weatherEntity = new WeatherEntity();
                weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
                weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
                weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
                weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
                weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
                return weatherEntity;
            }
        };
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcCursorItemReader(
        @Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
        JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(datasource); //设置DataSource
        //设置读取的SQL
        itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER"); 
        itemReader.setRowMapper(rowMapper); //设置转换
        return itemReader;
    }
}

HibernateCursorItemReader

在Java体系中数据库操做常见的规范有JPAORM,Spring Batch提供了HibernateCursorItemReader来实现HibernateTemplate,它能够经过Hibernate框架进行游标的控制。

须要注意的是:使用Hibernate框架来处理批量数据到目前为止一直都有争议,核心缘由是Hibernate最初是为在线联机事物型系统开发的。不过这并不意味着不能使用它来处理批数据,解决此问题就是让Hibernate使用StatelessSession用来保持游标,而不是standard session一次读写,这将致使Hibernate的缓存机制和数据脏读检查失效,进而影响批处理的过程。关于Hibernate的状态控制机制请阅读官方文档。

HibernateCursorItemReader使用过程与JdbcCursorItemReader没多大差别都是逐条读取数据而后控制状态连接关闭。只不过他提供了Hibernate所使用的HSQL方案。

@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
    itemReader.setName("hibernateCursorItemReader");
    itemReader.setQueryString("from WeatherEntity tmp_test_weather");
    itemReader.setSessionFactory(sessionFactory);
    return itemReader;
}

public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    return new HibernateCursorItemReaderBuilder<CustomerCredit>()
            .name("creditReader")
            .sessionFactory(sessionFactory)
            .queryString("from CustomerCredit")
            .build();
}

若是没有特别的须要,不推荐使用Hibernate

StoredProcedureItemReader

存储过程是在同一个数据库中处理大量数据的经常使用方法。StoredProcedureItemReader的执行过程和JdbcCursorItemReader一致,可是底层逻辑是先执行存储过程,而后返回存储过程执行结果游标。不一样的数据库存储过程游标返回会有一些差别:

  1. 做为一个ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL)
  2. 参数返回一个 ref-cursor实例。好比Oracle、PostgreSQL数据库,这类数据库存储过程是不会直接return任何内容的,须要从传参获取。
  3. 返回存储过程调用后的返回值。

针对以上3个类型,配置上有一些差别:

//随风溜达的向日葵 chkui.com
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
    StoredProcedureItemReader reader = new StoredProcedureItemReader();

    reader.setDataSource(dataSource);
    reader.setProcedureName("sp_processor_weather");
    reader.setRowMapper(new weatherEntityRowMapper());
	
    reader.setRefCursorPosition(1);//第二种类型须要指定ref-cursor的参数位置

    reader.setFunction(true);//第三种类型须要明确的告知reader经过返回获取

    return reader;
}

使用存储过程处理数据的好处是能够实现针对库内的数据进行合并、分割、排序等处理。若是数据在同一个数据库,性能也明显好于经过Java处理。

分页读取数据

相对于游标,还有一个办法是进行分页查询。分页查询意味着再进行批处理的过程当中同一个SQL会屡次执行。在联机型事物系统中分页查询经常使用于列表功能,每一次查询须要指定开始位置和结束位置。

JdbcPagingItemReader

分页查询的默认实现类是JdbcPagingItemReader,它的核心功能是用分页器PagingQueryProvider进行分页控制。因为不一样的数据库分页方法差异很大,因此针对不一样的数据库有不一样的实现类。框架提供了SqlPagingQueryProviderFactoryBean用于检查当前数据库并自动注入对应的PagingQueryProvider

JdbcPagingItemReader会从数据库中一次性读取一整页的数据,可是调用Reader的时候仍是会一行一行的返回数据。框架会自行根据运行状况肯定何时须要执行下一个分页的查询。

分页读取数据执行源码

执行JdbcPagingItemReader的代码在org.chenkui.spring.batch.sample.items.pageReader。启动位置是org.chenkui.spring.batch.sample.database.paging.JdbcPagingApplication

//随风溜达的向日葵 chkui.com
public class pageReader {
    final private boolean wrapperBuilder = false;
    @Bean
    //设置 queryProvider
    public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setDataSource(dataSource);
        provider.setSelectClause("select id, siteid, month, type, value, ext");
        provider.setFromClause("from tmp_test_weather");
        provider.setWhereClause("where id>:start");
        provider.setSortKey("id");

        return provider;
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
            PagingQueryProvider queryProvider,
            RowMapper<WeatherEntity> rowMapper) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("start", "1");
        JdbcPagingItemReader<WeatherEntity> itemReader;
        if (wrapperBuilder) {
            itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
                    .name("creditReader")
                    .dataSource(dataSource)
                    .queryProvider(queryProvider)
                    .parameterValues(parameterValues)
                    .rowMapper(rowMapper)
                    .pageSize(1000)
                    .build();
        } else {
            itemReader = new JdbcPagingItemReader<>();
            itemReader.setName("weatherEntityJdbcPagingItemReader");
            itemReader.setDataSource(dataSource);
            itemReader.setQueryProvider(queryProvider);
            itemReader.setParameterValues(parameterValues);
            itemReader.setRowMapper(rowMapper);
            itemReader.setPageSize(1000);
        }
        return itemReader;
    }
}

数据写入

Spring Batch为不一样类型的文件的写入提供了多个实现类,但并无为数据库的写入提供任何实现类,而是交由开发者本身去实现接口。理由是:

  1. 数据库的写入与文件写入有巨大的差异。对于一个Step而言,在写入一份文件时须要保持对文件的打开状态从而可以高效的向队尾添加数据。若是每次都从新打开文件,从开始位置移动到队尾会耗费大量的时间(不少文件流没法在open时就知道长度)。当整个Step结束时才能关闭文件的打开状态,框架提供的文件读写类都实现了这个控制过程。

  2. 另外不管使用何种方式将数据写入文件都是"逐行进行"的(流数据写入、字符串逐行写入)。所以当数据写入与整个Step绑定为事物时还须要实现一个控制过程是:在写入数据的过程当中出现异常时要擦除本次事物已经写入的数据,这样才能和整个Step的状态保持一致。框架中的类一样实现了这个过程。

  3. 可是向数据库写入数据并不须要相似于文件的尾部写入控制,由于数据库的各类连接池自己就保证了连接->写入->释放的高效执行,也不存在向队尾添加数据的问题。并且几乎全部的数据库驱动都提供了事物能力,在任什么时候候出现异常都会自动回退,不存在擦除数据的问题。

所以,对于数据库的写入操做只要按照常规的批量数据写入的方式便可,开发者使用任何工具均可以完成这个过程。

写入数据一个简单的实现

实现数据写入方法不少,这和常规的联机事务系统没任何区别。下面直接用JdbcTemplate实现了一个简单的数据库写入过程。

执行数据库写入的核心代码在org.chenkui.spring.batch.sample.items.JdbcWriter。启动位置是org.chenkui.spring.batch.sample.database.output.JdbcWriterApplication

//随风溜达的向日葵 chkui.com
public class JdbcWriter {

    @Bean
    public ItemWriter<WeatherEntity> jdbcBatchWriter(JdbcTemplate template) {

        return new ItemWriter<WeatherEntity>() {
            final private static String INSERt_SQL = 
                      "INSERT INTO tmp_test_weather(siteid, month, type, value, ext) VALUES(?,?,?,?,?)";
            @Override
            public void write(List<? extends WeatherEntity> items) throws Exception {
                List<Object[]> batchArgs = new ArrayList<>();
                for (WeatherEntity entity : items) {
                    Object[] objects = new Object[5];
                    objects[0] = entity.getSiteId();
                    objects[1] = entity.getMonth();
                    objects[2] = entity.getType().name();
                    objects[3] = entity.getValue();
                    objects[4] = entity.getExt();
                    batchArgs.add(objects);
                }
                template.batchUpdate(INSERt_SQL, batchArgs);
            }
        };
    }
}

组合使用案例

下面是一些组合使用过程,简单实现了文件到数据库、数据库到文件的过程。文件读写的过程已经在文件读写中介绍过,这里会重复使用以前介绍的文件读写的功能。

下面的案例是将data.csv中的数据写入到数据库,而后再将数据写入到out-data.csv。案例组合使用已有的item完成任务:flatFileReaderjdbcBatchWriterjdbcCursorItemReadersimpleProcessorflatFileWriter。这种ReaderProcessorWriter组合的方式也是完成一个批处理工程的常见开发方式。

案例的运行代码在org.chenkui.spring.batch.sample.database.complex包中,使用了2个Step来完成任务,一个将数据读取到数据库,一个将数据进行过滤,而后再写入到文件:

//随风溜达的向日葵 chkui.com
public class FileComplexProcessConfig {
    @Bean
    // 配置Step1
    public Step file2DatabaseStep(StepBuilderFactory builder,
            @Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
            @Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
        return builder.get("file2DatabaseStep") // 建立
                .<WeatherEntity, WeatherEntity>chunk(50) // 分片
                .reader(reader) // 读取
                .writer(writer) // 写入
                .faultTolerant() // 开启容错处理
                .skipLimit(20) // 跳过设置
                .skip(Exception.class) // 跳过异常
                .build();
    }

    @Bean
    // 配置Step2
    public Step database2FileStep(StepBuilderFactory builder, 
            @Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
            @Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
            @Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
        return builder.get("database2FileStep") // 建立
                .<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
                .reader(reader) // 读取
                .processor(processor) //
                .writer(writer) // 写入
                .faultTolerant() // 开启容错处理
                .skipLimit(20) // 跳过设置
                .skip(Exception.class) // 跳过异常
                .build();
    }

    @Bean
    public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
            @Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
        return builder.get("File2Database").start(step2Database).next(step2File).build();
    }
}
相关文章
相关标签/搜索