在Spring batch由上至下的结构中Job、Step都是属于框架级别的的功能,大部分时候都是提供一些配置选项给开发人员使用,而Item中的Reader
、Processor
和Writer
是属于业务级别的,它开放了一些业务切入的接口。 可是文件的读写过程当中有不少通用一致的功能Spring Batch为这些相同的功能提供了一致性实现类。git
扁平结构文件(也称为矩阵结构文件,后文简称为文件)是最多见的一种文件类型。他一般以一行表示一条记录,字段数据之间用某种方式分割。与标准的格式数据(xml、json等)主要差异在于他没有结构性描述方案(SXD、JSON-SCHEME),进而没有结构性分割规范。所以在读写此类文件以前须要先设定好字段的分割方法。github
文件的字段数据分割方式一般有两种:使用分隔符或固定字段长度。前者一般使用逗号(,
)之类的符号对字段数据进行划分,后者的每一列字段数据长度是固定的。 框架为文件的读取提供了FieldSet
用于将文件结构中的信息映射到一个对象。FieldSet
的做用是将文件的数据与类的field
进行绑定(field是Java中常见的概念,不清楚的能够了解Java反射)。spring
Spring Batch为文件读取提供了FlatFileItemReader
类,它为文件中的数据的读取和转换提供了基本功能。在FlatFileItemReader
中有2个主要的功能接口,一是Resource
、二是LineMapper
。 Resource
用于外部文件获取,详情请查看Spring核心——资源管理部分的内容,下面是一个例子:数据库
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的生产环境中,文件一般由中心化、或者流程式的基础框架来管理(好比EAI)。所以文件每每须要使用FTP等方式从其余位置获取。如何迁移文件已经超出了Spring Batch框架的范围,在Spring的体系中能够参考Spring Integration
项目。json
下面是FlatFileItemReader
的属性,每个属性都提供了Setter方法。app
属性名 | 参数类型 | 说明 |
---|---|---|
comments | String[] | 指定文件中的注释前缀,用于过滤注释内容行 |
encoding | String | 指定文件的编码方式,默认为Charset.defaultCharset() |
lineMapper | LineMapper | 利用LineMapper接口将一行字符串转换为对象 |
linesToSkip | int | 跳过文件开始位置的行数,用于跳过一些字段的描述行 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用于判断数据是否结束 |
resource | Resource | 指定外部资源文件位置 |
skippedLinesCallback | LineCallbackHandler | 当配置linesToSkip,每执行一次跳过都会被回调一次,会传入跳过的行数据内容 |
每一个属性都为文件的解析提供了某方面的功能,下面是结构的说明。框架
这个接口的做用是将字符串转换为对象:curl
public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }
接口的基本处理逻辑是聚合类(FlatFileItemReader
)传递一行字符串以及行号给LineMapper::mapLine
,方法处理后返回一个映射的对象。ide
这个接口的做用是将一行数据转换为一个FieldSet
结构。对于Spring Batch而言,扁平结构文件的到Java实体的映射都经过FieldSet
来控制,所以读写文件的过程须要完成字符串到FieldSet
的转换:工具
public interface LineTokenizer { FieldSet tokenize(String line); }
这个接口的含义是:传递一行字符串数据,而后获取一个FieldSet
。
框架为LineTokenizer
提供三个实现类:
DelimitedLineTokenizer
:利用分隔符将数据转换为FieldSet
。最多见的分隔符是逗号,
,类提供了分隔符的配置和解析方法。
FixedLengthTokenizer
:根据字段的长度来解析出FieldSet
结构。必须为记录定义字段宽度。
PatternMatchingCompositeLineTokenizer
:使用一个匹配机制来动态决定使用哪一个LineTokenizer
。
该接口是将FieldSet
转换为对象:
public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }
FieldSetMapper
一般和LineTokenizer
联合在一块儿使用:String->FieldSet->Object。
DefaultLineMapper
是LineMapper
的实现,他实现了从文件到Java实体的映射:
public class DefaultLineMapper implements LineMapper<>, InitializingBean { private LineTokenizer tokenizer; private FieldSetMapper fieldSetMapper; public T mapLine(String line, int lineNumber) throws Exception { return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line)); } public void setLineTokenizer(LineTokenizer tokenizer) { this.tokenizer = tokenizer; } public void setFieldSetMapper(FieldSetMapper fieldSetMapper) { this.fieldSetMapper = fieldSetMapper; } }
在解析文件时数据是按行解析的:
LineTokenizer
将字符串解析为FieldSet
结构。FieldSetMapper
继续解析为一个Java实体对象返回给调用者。DefaultLineMapper
是框架提供的默认实现类,看似很是简单,可是利用组合模式能够扩展出不少功能。
在转换过程当中若是将FieldSet
的names
属性与目标类的field
绑定在一块儿,那么能够直接使用反射实现数据转换,为此框架提供了BeanWrapperFieldSetMapper
来实现。
DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //建立LineMapper DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //建立LineTokenizer tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //设置Field名称 BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>(); //建立FieldSetMapper wrapperMapper.setTargetType(WeatherEntity.class); //设置实体,实体的field名称必须和tokenizer.names一致。 // 组合lineMapper lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(wrapperMapper);
上面提到了各类接口和实现,实际上都是围绕着FlatFileItemReader
的属性在介绍,虽然内容不少可是实际上就如下几点:
首先要定位文件,Spring Batch提供了Resource
相关的定位方法。
其次是将文件中的行字符串数据转换为对象,LineMapper
的功能就是完成这个功能。
框架为LineMapper
提供了DefaultLineMapper
做为默认实现方法,在DefaultLineMapper
中须要组合使用LineTokenizer
和FieldSetMapper
。前者将字符串转为为一个Field
,后者将Field
转换为目标对象。
LineTokenizer
有3个实现类可供使用、FieldSetMapper
有一个默认实现类BeanWrapperFieldSetMapper
。
可执行的源码在下列地址的items子工程中:
运行以前须要配置数据库连接,参看源码库中的README.md。
文件读取的主要逻辑在org.chenkui.spring.batch.sample.items.FlatFileReader
类:
public class FlatFileReader { // FeildSet的字段名,设置字段名以后能够直接使用名字做为索引获取数据。也可使用索引位置来获取数据 public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" }; private boolean userWrapper = false; @Bean //定义FieldSetMapper用于FieldSet->WeatherEntity public FieldSetMapper<WeatherEntity> fieldSetMapper() { return new FieldSetMapper<WeatherEntity>() { @Override public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException { if (null == fieldSet) { return null; // fieldSet不存在则跳过该行处理 } else { WeatherEntity observe = new WeatherEntity(); observe.setSiteId(fieldSet.readRawString("siteId")); //Setter return observe; } } }; } @Bean // 配置 Reader public ItemReader<WeatherEntity> flatFileReader( @Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) { FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 读取资源文件 DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper实现类 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 建立LineTokenizer接口实现 tokenizer.setNames(Tokenizer); // 设定每一个字段的名称,若是不设置须要使用索引获取值 lineMapper.setLineTokenizer(tokenizer); // 设置tokenizer工具 if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接转换 BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>(); wrapperMapper.setTargetType(WeatherEntity.class); fieldSetMapper = wrapperMapper; } lineMapper.setFieldSetMapper(fieldSetMapper); reader.setLineMapper(lineMapper); reader.setLinesToSkip(1); // 跳过的初始行,用于过滤字段行 reader.open(new ExecutionContext()); return reader; } }
除了按照分隔符,有些文件能够字段数据的占位长度来提取数据。按照前面介绍的过程,实际上只要修改LineTokenizer接口便可,框架提供了FixedLengthTokenizer
类:
@Bean public FixedLengthTokenizer fixedLengthTokenizer() { FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); tokenizer.setNames("ISIN", "Quantity", "Price", "Customer"); //Range用于设定数据的长度。 tokenizer.setColumns(new Range(1-12), new Range(13-15), new Range(16-20), new Range(21-29)); return tokenizer; }
将数据写入到文件与读取的过程正好相反:将对象转换为字符串。
与LineMapper
相对应的是LineAggregator
,他的功能是将实体转换为字符串:
public interface LineAggregator<T> { public String aggregate(T item); }
框架为LineAggregator
接口提供了一个很是简单的实现类——PassThroughLineAggregator
,其惟一实现就是使用对象的toString
方法:
public class PassThroughLineAggregator<T> implements LineAggregator<T> { public String aggregate(T item) { return item.toString(); } }
LineAggregator
的另一个实现类是DelimitedLineAggregator
。与PassThroughLineAggregator
简单直接使用toString
方法不一样的是,DelimitedLineAggregator
须要一个转换接口FieldExtractor
:
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>(); lineAggregator.setDelimiter(","); lineAggregator.setFieldExtractor(fieldExtractor);
FieldExtractor
用于实体类到collection
结构的转换。它能够和LineTokenizer
进行类比,前者是将实体类转换为扁平结构的数据,后者是将String
转换为一个FieldSet
结构。
public interface FieldExtractor<T> { Object[] extract(T item); }
框架为FieldExtractor
接口提供了一个基于反射的实现类BeanWrapperFieldExtractor
,其过程就是将实体对象转换为列表:
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] {"field1", "field2"});
setName
方法用于指定要转换的field
列表。
文件读取的逻辑很是简单:文件存在打开文件并写入数据,当文件不存在抛出异常。可是写入文件明显不能这么简单粗暴。新建一个JobInstance
时最直观的操做是:存在同名文件就抛出异常,不存在则建立文件并写入数据。可是这样作显然有很大的问题,当批处理过程当中出现问题须要restart
,此时并不会从头开始处理全部的数据,而是要求文件存在并接着继续写入。为了确保这个过程FlatFileItemWriter
默认会在新JobInstance
运行时删除已有文件,而运行重启时继续在文件末尾写入。FlatFileItemWriter
可使用shouldDeleteIfExists
、appendAllowed
、shouldDeleteIfEmpty
来有针对性的控制文件。
文件写入主要代码在org.chenkui.spring.batch.sample.items.FlatFileWriter
:
public class FlatFileWriter { private boolean useBuilder = true; @Bean public ItemWriter<MaxTemperatureEntiry> flatFileWriter() { BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //设置映射field fieldExtractor.afterPropertiesSet(); //参数检查 DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>(); lineAggregator.setDelimiter(","); //设置输出分隔符 lineAggregator.setFieldExtractor(fieldExtractor); //设置FieldExtractor处理器 FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>(); fileWriter.setLineAggregator(lineAggregator); fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //设置输出文件位置 fileWriter.setName("outpufData"); if (useBuilder) {//使用builder方式建立 fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData") .resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator) .build(); } return fileWriter; } }
文件的写入过程与读取过程彻底对称相反:先用FieldExtractor
将对象转换为一个collection
结构(列表),而后用lineAggregator
将collection
转化为带分隔符的字符串。
代码中的测试数据来自数据分析交流项目bi-process-example,是NOAA的2015年全球天气监控数据。为了便于源码存储进行了大量的删减,原始数据有百万条,若有须要使用下列方式下载:
curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #数据文件 curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件结构及类型说明
代码实现了读取文件、处理文件、写入文件的整个过程。处理文件的过程是只获取监控的最高温度信息(Type=TMAX
),其余都过滤。
本案例的代码使用org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main
方法运行,使用的是Command Runner的方式执行(运行方式的说明见Item概念及使用代码的命令行方式运行、Java内嵌运行)。