批处理任务的主要业务逻辑都是在Step
中去完成的。能够将Job
理解为运行Step
的框架,而Step
理解为业务功能。html
Step
是Job
中的工做单元,每个Step
涵盖了单行记录的处理闭环。下图是一个Step
的简要结构:spring
一个Step
一般涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。可是并非全部的Step
都须要自身来完成数据的处理,好比存储过程等方式是经过外部功能来完成,所以Spring Batch提供了2种Step的处理方式:1)面向分片的ChunkStep
,2)面向过程的TaskletStep
。可是基本上大部分状况下都是使用面向分片的方式来解决问题。数据库
在Step
中数据是按记录(按行)处理的,可是每条记录处理完毕以后立刻提交事物反而会致使IO的巨大压力。所以Spring Batch提供了数据处理的分片功能。设置了分片以后,一次工做会从Read开始,而后交由给Processor处理。处理完毕后会进行聚合,待聚合到必定的数量的数据以后一次性调用Write将数据提交到物理数据库。其过程大体为:缓存
在Spring Batch中所谓的事物和数据事物的概念同样,就是一次性提交多少数据。若是在聚合数据期间出现任何错误,全部的这些数据都将不执行写入。安全
@Bean public Job sampleJob(JobRepository jobRepository, Step sampleStep) { return this.jobBuilderFactory.get("sampleJob") .repository(jobRepository) .start(sampleStep) .build(); } @Bean public Step sampleStep(PlatformTransactionManager transactionManager) { return this.stepBuilderFactory.get("sampleStep") .transactionManager(transactionManager) .<String, String>chunk(10) //分片配置 .reader(itemReader()) //reader配置 .writer(itemWriter()) //write配置 .build(); }
观察sampleStep方法:app
PlatformTransactionManager
对事物进行管理。当配置好事物以后Spring Batch会自动对事物进行管理,无需开发人员显示操做。是否使用ItemProcessor
是一个可选项。若是没有Processor能够将数据视为读取并直接写入。框架
Step
使用PlatformTransactionManager
管理事物。每次事物提交的间隔根据chunk
方法中配置的数据执行。若是设置为1,那么在每一条数据处理完以后都会调用ItemWrite
进行提交。提交间隔设置过小,那么会浪费须要多没必要要的资源,提交间隔设置的太长,会致使事物链太长占用空间,而且出现失败会致使大量数据回滚。所以设定一个合理的间隔是很是必要的,这须要根据实际业务状况、性能要求、以及数据安全程度来设定。若是没有明确的评估目标,设置为10~20较为合适。异步
前文介绍了Job
的重启,可是每次重启对Step
也是有很大的影响的,所以须要特定的配置。jvm
某些Step
可能用于处理一些先决的任务,因此当Job再次重启时这Step
就不必再执行,能够经过设置startLimit来限定某个Step
重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将再也不执行:ide
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .startLimit(1) .build(); }
在单个JobInstance
的上下文中,若是某个Step
已经处理完毕(COMPLETED)那么在默认状况下重启以后这个Step
并不会再执行。能够经过设置allow-start-if-complete
为true告知框架每次重启该Step
都要执行:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .allowStartIfComplete(true) .build(); }
某些时候在任务处理单个记录时中出现失败并不该该中止任务,而应该跳过继续处理下一条数据。是否跳过须要根据业务来断定,所以框架提供了跳过机制交给开发人员使用。如何配置跳过机制:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(FlatFileParseException.class) .build(); }
代码的含义是当处理过程当中抛出FlatFileParseException
异常时就跳过该条记录的处理。skip-limit
(skipLimit方法)配置的参数表示当跳过的次数超过数值时则会致使整个Step
失败,从而中止继续运行。还能够经过反向配置的方式来忽略某些异常:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(Exception.class) .noSkip(FileNotFoundException.class) .build(); }
skip
表示要当捕捉到Exception异常就跳过。可是Exception有不少继承类,此时可使用noSkip
方法指定某些异常不能跳过。
当处理记录出个异常以后并不但愿他当即跳过或者中止运行,而是但愿能够屡次尝试执行直到失败:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }
retry(DeadlockLoserDataAccessException.class)
表示只有捕捉到该异常才会重试,retryLimit(3)
表示最多重试3次,faultTolerant()
表示启用对应的容错功能。
默认状况下,不管是设置了重试(retry)仍是跳过(skip),只要从Writer
抛出一个异常都会致使事物回滚。若是配置了skip机制,那么在Reader
中抛出的异常不会致使回滚。有些从Writer
抛出一个异常并不须要回滚数据,noRollback
属性为Step
提供了没必要进行事物回滚的异常配置:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .noRollback(ValidationException.class) //没必要回滚的异常 .build(); }
一次Setp
分为Reader
、Processor
和Writer
三个阶段,这些阶段统称为Item
。默认状况下若是错误不是发生在Reader阶段,那么不必再去从新读取一次数据。可是某些场景下须要Reader部分也须要从新执行,好比Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,所以也要将Reader归入到回滚的事物中,根据这个场景可使用readerIsTransactionalQueue
来配置数据重读:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .readerIsTransactionalQueue() //数据重读 .build(); }
事物的属性包括隔离等级(isolation)、传播方式(propagation)以及过时时间(timeout)。关于事物的控制详见Spring Data Access的说明,下面是相关配置的方法:
@Bean public Step step1() { //配置事物属性 DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); attribute.setPropagationBehavior(Propagation.REQUIRED.value()); attribute.setIsolationLevel(Isolation.DEFAULT.value()); attribute.setTimeout(30); return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .transactionAttribute(attribute) //设置事物属性 .build(); }
ItemStream
是用于每个阶段(Reader、Processor、Writer)的“生命周期回调数据处理器”,后续的文章会详细介绍ItemStream
。在4.×版本以后默认注入注册了通用的ItemStream
。
有2种方式将ItemStream
注册到Step
中,一是使用stream
方法:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(compositeItemWriter()) .stream(fileItemWriter1()) .stream(fileItemWriter2()) .build(); }
二是使用相关方法的代理:
@Bean public CompositeItemWriter compositeItemWriter() { List<ItemWriter> writers = new ArrayList<>(2); writers.add(fileItemWriter1()); writers.add(fileItemWriter2()); CompositeItemWriter itemWriter = new CompositeItemWriter(); itemWriter.setDelegates(writers); return itemWriter; }
在Step
执行的过程当中会产生各类各样的事件,开发人员能够利用各类Listener
接口对Step
及Item
进行监听。一般在建立一个Step的时候添加拦截器:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(reader()) .writer(writer()) .listener(chunkListener()) //添加拦截器 .build(); }
Spring Batch提供了多个接口以知足不一样事件的监听。
StepExecutionListener
能够看作一个通用的Step
拦截器,他的做用是在Step开始以前和结束以后进行拦截处理:
public interface StepExecutionListener extends StepListener { void beforeStep(StepExecution stepExecution); //Step执行以前 ExitStatus afterStep(StepExecution stepExecution); //Step执行完毕以后 }
在结束的时候开发人员能够本身定义返回的ExitStatus
,用于配合流程控制(见后文)实现对整个Step执行过程的控制。
ChunkListener
是在数据事物发生的两端被触发。chunk
的配置决定了处理多少项记录才进行一次事物提交,ChunkListener
的做用就是对一次事物开始以后或事物提交以后进行拦截:
public interface ChunkListener extends StepListener { void beforeChunk(ChunkContext context); //事物开始以后,ItemReader调用以前 void afterChunk(ChunkContext context); //事物提交以后 void afterChunkError(ChunkContext context); //事物回滚以后 }
若是没有设定chunk也可使用ChunkListener
,它会被TaskletStep
调用(TaskletStep
见后文)。
该接口用于对Reader
相关的事件进行监控:
public interface ItemReadListener<T> extends StepListener { void beforeRead(); void afterRead(T item); void onReadError(Exception ex); }
beforeRead
在每次Reader
调用以前被调用,afterRead
在每次Reader
成功返回以后被调用,而onReadError
会在出现异常以后被调用,能够将其用于记录异常日志。
ItemProcessListener
和ItemReadListener
相似,是围绕着ItemProcessor
进行处理的:
public interface ItemProcessListener<T, S> extends StepListener { void beforeProcess(T item); //processor执行以前 void afterProcess(T item, S result); //processor直线成功以后 void onProcessError(T item, Exception e); //processor执行出现异常 }
ItemWriteListener
的功能和ItemReadListener
、ItemReadListener
相似,可是须要注意的是它接收和处理的数据对象是一个List
。List
的长度与chunk配置相关。
public interface ItemWriteListener<S> extends StepListener { void beforeWrite(List<? extends S> items); void afterWrite(List<? extends S> items); void onWriteError(Exception exception, List<? extends S> items); }
ItemReadListener
、ItemProcessListener
和ItemWriteListener
都提供了错误拦截处理的机制,可是没有处理跳过(skip)的数据记录。所以框架提供了SkipListener
来专门处理那么被跳过的记录:
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); //Read期间致使跳过的异常 void onSkipInProcess(T item, Throwable t); //Process期间致使跳过的异常 void onSkipInWrite(S item, Throwable t); //Write期间致使跳过的异常 }
SkipListener
的价值是能够将那些未能成功处理的记录在某个位置保存下来,而后交给其余批处理进一步解决,或者人工来处理。Spring Batch保证如下2个特征:
SkipListener
始终在事物提交以前被调用,这样能够保证监听器使用的事物资源不会被业务事物影响。面向分片(Chunk-oriented processing )的过程并非Step的惟一执行方式。好比用数据库的存储过程来处理数据,这个时候使用标准的Reader、Processor、Writer会很奇怪,针对这些状况框架提供了TaskletStep
。
TaskletStep
是一个很是简单的接口,仅有一个方法——execute
。TaskletStep
会反复的调用这个方法直到获取一个RepeatStatus.FINISHED
返回或者抛出一个异常。全部的Tasklet
调用都会包装在一个事物中。
注册一个TaskletStep
很是简单,只要添加一个实现了Tasklet
接口的类便可:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .tasklet(myTasklet()) //注入Tasklet的实现 .build(); }
TaskletStep
还支持适配器处理等,详见官网说明。
默认状况下。Step与Step之间是顺序执行的,以下图:
顺序执行经过next
方法来标记:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .next(stepB()) //顺序执行 .next(stepC()) .build(); }
在顺序执行的过程当中,在整个执行链条中有一个Step
执行失败则整个Job
就会中止。可是经过条件执行,能够指定各类状况下的执行分支:
为了实现更加复杂的控制,能够经过Step
执行后的退出命名来定义条件分之。先看一个简单的代码:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) //启动时执行的step .on("*").to(stepB()) //默认跳转到stepB .from(stepA()).on("FAILED").to(stepC()) //当返回的ExitStatus为"FAILED"时,执行。 .end() .build(); }
这里使用*来表示默认处理,*是一个通配符表示处理任意字符串,对应的还可使用?表示匹配任意字符。在Spring Batch(1)——数据批处理概念一文中介绍了Step的退出都会有ExitStatus
,命名都来源于它。下面是一个更加全面的代码。
public class SkipCheckingListener extends StepExecutionListenerSupport { public ExitStatus afterStep(StepExecution stepExecution) { String exitCode = stepExecution.getExitStatus().getExitCode(); if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) && stepExecution.getSkipCount() > 0) { //当Skip的Item大于0时,则指定ExitStatus的内容 return new ExitStatus("COMPLETED WITH SKIPS"); } else { return null; } } }
拦截器指示当有一个以上被跳过的记录时,返回的ExitStatus
为"COMPLETED WITH SKIPS"。对应的控制流程:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()).on("FAILED").end() //执行失败直接退出 .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) //有跳过元素执行 errorPrint1() .from(step1()).on("*").to(step2()) //默认(成功)状况下执行 Step2 .end() .build(); }
Spring Batch为Job
提供了三种退出机制,这些机制为批处理的执行提供了丰富的控制方法。在介绍退出机制以前须要回顾一下 数据批处理概念一文中关于StepExecution
的内容。在StepExecution
中有2个表示状态的值,一个名为status
,另一个名为exitStatus
。前者也被称为BatchStatus
。
前面以及介绍了ExitStatus
的使用,他能够控制Step执行链条的条件执行过程。除此以外BatchStatus
也会参与到过程的控制。
默认状况下(没有使用end
、fail
方法结束),Job
要顺序执行直到退出,这个退出称为end
。这个时候,BatchStatus
=COMPLETED
、ExitStatus
=COMPLETED
,表示成功执行。
除了Step
链式处理天然退出,也能够显示调用end
来退出系统。看下面的例子:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) //启动 .next(step2()) //顺序执行 .on("FAILED").end() .from(step2()).on("*").to(step3()) //条件执行 .end() .build(); }
上面的代码,step1
到step2
是顺序执行,当step2
的exitStatus
返回"FAILED"时则直接End退出。其余状况执行Step3
。
除了end
还可使用fail
退出,这个时候,BatchStatus
=FAILED
、ExitStatus
=EARLY TERMINATION
,表示执行失败。这个状态与End
最大的区别是Job
会尝试重启执行新的JobExecution
。看下面代码的例子:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) //执行step1 .next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 执行fail .from(step2()).on("*").to(step3()) //不然执行step3 .end() .build(); }
Spring Batch还支持在指定的节点退出,退出后下次重启会从中断的点继续执行。中断的做用是某些批处理到某个步骤后须要人工干预,当干预完以后又接着处理:
@Bean public Job job() { return this.jobBuilderFactory.get("job") //若是step1的ExitStatus=COMPLETED则在step2中断 .start(step1()).on("COMPLETED").stopAndRestart(step2()) //不然直接退出批处理 .end() .build(); }
能够直接进行编码来控制Step
之间的扭转,Spring Batch提供了JobExecutionDecider
接口来协助分支管理:
public class MyDecider implements JobExecutionDecider { public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { String status; if (someCondition()) { status = "FAILED"; } else { status = "COMPLETED"; } return new FlowExecutionStatus(status); } }
接着将MyDecider
做为过滤器添加到配置过程当中:
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .next(decider()).on("FAILED").to(step2()) .from(decider()).on("COMPLETED").to(step3()) .end() .build(); }
在线性处理过程当中,流程都是一个接着一个执行的。可是为了知足某些特殊的须要,Spring Batch提供了执行的过程分裂并行Step
的方法。参看下面的Job
配置:
@Bean public Job job() { Flow flow1 = new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build();//并行流程1 Flow flow2 = new FlowBuilder<SimpleFlow>("flow2") .start(step3()) .build();//并行流程2 return this.jobBuilderFactory.get("job") .start(flow1) .split(new SimpleAsyncTaskExecutor()) //建立一个异步执行任务 .add(flow2) .next(step4()) //2个分支执行完毕以后再执行step4。 .end() .build(); }
这里表示flow1和flow2会并行执行,待2者执行成功后执行step4。
在Job
或Step
的任何位置,均可以获取到统一配置的数据。好比使用标准的Spring Framework方式:
@Bean public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
当咱们经过配置文件(application.properties中 input.file.name=filepath
)或者jvm参数(-Dinput.file.name=filepath
)指定某些数据时,均可以经过这种方式获取到对应的配置参数。
此外,也能够从JobParameters
从获取到Job
运行的上下文参数:
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
不管是JobExecution
仍是StepExecution
,其中的内容均可以经过这种方式去获取参数,例如:
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
或者
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
注意看上面的代码例子,都有一个@StepScope
注解。这是为了进行后期绑定进行的标识。由于在Spring的IoCs容器进行初始化的阶段并无任何的*Execution
在执行,进而也不存在任何*ExecutionContext
,因此这个时候根本没法注入标记的数据。因此须要使用注解显式的告诉容器直到Step
执行的阶段才初始化这个@Bean
。
Job Scope的概念和 Step Scope相似,都是用于标识在到了某个执行时间段再添加和注入Bean。@JobScope
用于告知框架知道JobInstance
存在时候才初始化对应的@Bean
:
@JobScope @Bean // 初始化获取 jobParameters中的参数 public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
@JobScope @Bean // 初始化获取jobExecutionContext中的参数 public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的参数['input.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }