在大型企业中,因为业务复杂、数据量大、数据格式不一样、数据交互格式繁杂,并不是全部的操做都能经过交互界面进行处理。而有一些操做须要按期读取大批量的数据,而后进行一系列的后续处理。这样的过程就是“批处理”。web
批处理应用一般有如下特色:spring
- 数据量大,从数万到数百万甚至上亿不等;
- 整个过程所有自动化,并预留必定接口进行自定义配置;
- 这样的应用一般是周期性运行,好比按日、周、月运行;
- 对数据处理的准确性要求高,而且须要容错机制、回滚机制、完善的日志监控等。
什么是Spring batch
Spring batch是一个轻量级的全面的批处理框架,它专为大型企业而设计,帮助开发健壮的批处理应用。Spring batch为处理大批量数据提供了不少必要的可重用的功能,好比日志追踪、事务管理、job执行统计、重启job和资源管理等。同时它也提供了优化和分片技术用于实现高性能的批处理任务。sql
它的核心功能包括:数据库
- 事务管理
- 基于块的处理过程
- 声明式的输入/输出操做
- 启动、终止、重启任务
- 重试/跳过任务
- 基于Web的管理员接口
笔者所在的部门属于国外某大型金融公司的CRM部门,在平常工做中咱们常常须要开发一些批处理应用,对Spring Batch有着丰富的使用经验。近段时间笔者特地总结了这些经验。安全
使用Spring Batch 3.0以及Spring Boot
在使用Spring Batch时推荐使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它作了如下方面的提高:多线程
- 支持JSR-352标准
- 支持Spring4以及Java8
- 加强了Spring Batch Integration的功能
- 支持JobScope
- 支持SQLite
支持Spring4和Java8是一个重大的提高。这样就可使用Spring4引入的Spring boot组件,从而开发效率方面有了一个质的飞跃。引入Spring-batch框架只须要在build.gradle中加入一行代码便可:app
1
|
compile("org.springframework.boot:spring-boot-starter-batch") |
而加强Spring Batch Integration的功能后,咱们就能够很方便的和Spring家族的其余组件集成,还能够以多种方式来调用job,也支持远程分区操做以及远程块处理。框架
而支持JobScope后咱们能够随时为对象注入当前Job实例的上下文信息。只要咱们制定Bean的scope为job scope,那么就能够随时使用jobParameters和jobExecutionContext等信息。运维
1 2 3 4 5 6 7 |
<bean id="..." class="..." scope="job"> <property name="name" value="#{jobParameters[input]}" /> </bean> <bean id="..." class="..." scope="job"> <property name="name" value="#{jobExecutionContext['input.name']}.txt" /> </bean> |
使用Java Config而不是xml的配置方式
以前咱们在配置job和step的时候都习惯用xml的配置方式,可是随着时间的推移发现问题颇多。async
- xml文件数急剧膨胀,配置块长且复杂,可读性不好;
- xml文件缺乏语法检查,有些低级错误只有在运行集成测试的时候才能发现;
- 在xml文件中进行代码跳转时IDE的支持力度不够;
咱们渐渐发现使用纯Java类的配置方式更灵活,它是类型安全的,并且IDE的支持更好。在构建job或step时采用的流式语法相比xml更加简洁易懂。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .listener(logSkipListener()) .build(); } |
在这个例子中能够很清楚的看到该step的配置,好比reader/processor/writer组件,以及配置了哪些listener等。
本地集成测试中使用内存数据库
Spring batch在运行时须要数据库支持,由于它须要在数据库中创建一套schema来存储job和step运行的统计信息。而在本地集成测试中咱们能够借助Spring batch提供的内存Repository来存储Spring batch的任务执行信息,这样即避免了在本地配置一个数据库,又能够加快job的执行。
1 2 3 4 |
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> |
咱们在build.gradle中加入对hsqldb的依赖:
1
|
runtime(‘org.hsqldb:hsqldb:2.3.2’) |
而后在测试类中添加对DataSource的配置。
1 2 3 4 5 6 7 |
@EnableAutoConfiguration @EnableBatchProcessing @DataJpaTest @Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class}) public class TestConfiguration { } |
而且在applicaton.properties配置中添加初始化Database的配置:
1
|
spring.batch.initializer.enable=true |
合理的使用Chunk机制
Spring batch在配置Step时采用的是基于Chunk的机制。即每次读取一条数据,再处理一条数据,累积到必定数量后再一次性交给writer进行写入操做。这样能够最大化的优化写入效率,整个事务也是基于Chunk来进行。
当咱们在须要将数据写入到文件、数据库中之类的操做时能够适当设置Chunk的值以知足写入效率最大化。但有些场景下咱们的写入操做实际上是调用一个web service或者将消息发送到某个消息队列中,那么这些场景下咱们就须要设置Chunk的值为1,这样既能够及时的处理写入,也不会因为整个Chunk中发生异常后,在重试时出现重复调用服务或者重复发送消息的状况。
使用Listener来监视job执行状况并及时作相应的处理
Spring batch提供了大量的Listener来对job的各个执行环节进行全面的监控。
在job层面Spring batch提供了JobExecutionListener接口,其支持在Job开始或结束时进行一些额外处理。在step层面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同时对Retry和Skip操做也提供了RetryListener及SkipListener。
一般咱们会为每一个job都实现一个JobExecutionListener,在afterJob操做中咱们输出job的执行信息,包括执行时间、job参数、退出代码、执行的step以及每一个step的详细信息。这样不管是开发、测试仍是运维人员对整个job的执行状况了如指掌。
若是某个step会发生skip的操做,咱们也会为其实现一个SkipListener,并在其中记录skip的数据条目,用于下一步的处理。
实现Listener有两种方式,一种是继承自相应的接口,好比继承JobExecutionListener接口,另外一种是使用annoation(注解)的方式。通过实践咱们认为使用注解的方式更好一些,由于使用接口你须要实现接口的全部方法,而使用注解则只须要对相应的方法添加annoation便可。
下面的这个类采用了继承接口的方式,咱们看到其实咱们只用到了第一个方法,第二个和第三个都没有用到。可是咱们必须提供一个空的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class CustomSkipListener implements SkipListener<String, String> { @Override public void onSkipInRead(Throwable t) { // business logic } @Override public void onSkipInWrite(String item, Throwable t) { // no need } @Override public void onSkipInProcess(String item, Throwable t) { // no need } } |
而使用annoation的方式能够简写为:
1 2 3 4 5 6 7 |
public class CustomSkipListener { @OnSkipInRead public void onSkipInRead(Throwable t) { // business logic } } |
使用Retry和Skip加强批处理工做的健壮性
在处理百万级的数据过程过程当中不免会出现异常。若是一旦出现异常而致使整个批处理工做终止的话那么会致使后续的数据没法被处理。Spring Batch内置了Retry(重试)和Skip(跳过)机制帮助咱们轻松处理各类异常。适合Retry的异常的特色是这些异常可能会随着时间推移而消失,好比数据库目前有锁没法写入、web服务当前不可用、web服务满载等。因此对这些异常咱们能够配置Retry机制。而有些异常则不该该配置Retry,好比解析文件出现异常等,由于这些异常即便Retry也会始终失败。
即便Retry屡次仍然失败也无需让整个step失败,能够对指定的异常设置Skip选项从而保证后续的数据可以被继续处理。咱们也能够配置SkipLimit选项保证当Skip的数据条目达到必定数量后及时终止整个Job。
有时候咱们须要在每次Retry中间隔作一些操做,好比延长Retry时间,恢复操做现场等,Spring Batch提供了BackOffPolicy来达到目的。下面是一个配置了Retry机制、Skip机制以及BackOffPolicy的step示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .retryLimit(5) .retry(ServiceUnavailableException.class) .backOffPolicy(backoffPolicy) .listener(logSkipListener()) .build(); } |
使用自定义的Decider来实现Job flow
在Job执行过程当中不必定都是顺序执行的,咱们常常须要根据某个job的输出数据或执行结果来决定下一步的走向。之前咱们会把一些判断放置在下游step中进行,这样可能会致使有些step实际运行了,但其实并无作任何事情。好比一个step执行过程当中会将失败的数据条目记录到一个报告中,而下一个step会判断有没有生成报告,若是生成了报告则将该报告发送给指定联系人,若是没有则不作任何事情。这种状况下能够经过Decider机制来实现Job的执行流程。在Spring batch 3.0中Decider已经从Step中独立出来,和Step处于同一级别。
1 2 3 4 5 6 7 8 9 10 |
public class ReportDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { if (report.isExist()) { return new FlowExecutionStatus(“SEND"); } return new FlowExecutionStatus(“SKIP"); } } |
而在job配置中能够这样来使用Decider。这样整个Job的执行流程会更加清晰易懂。
1 2 3 4 5 6 7 8 |
public Job job() { return new JobBuilder("petstore") .start(orderProcess()) .next(reportDecider) .on("SEND").to(sendReportStep) .on("SKIP").end().build() .build() } |
采用多种机制加速Job的执行
批处理工做处理的数据量大,而执行窗口通常又要求比较小。因此必需要经过多种方式来加速Job的执行。通常咱们有四种方式来实现:
- 在单个step中多线程执行任务
- 并行执行不一样的Step
- 并行执行同一个Step
- 远程执行Chunk任务
在单个step多线程执行任务能够借助于taskExecutor来实现。这种状况适合于reader、writer是线程安全的而且是无状态的场景。咱们还能够设置线程数量。
1 2 3 4 5 6 |
public Step step() { return stepBuilders.get("step") .tasklet(tasklet) .throttleLimit(20) .build(); } |
上述示例中的tasklet须要实现TaskExecutor,Spring Batch提供了一个简单的多线程TaskExecutor供咱们使用:SimpleAsyncTaskExecutor。
并行执行不一样的Step在Spring batch中很容易实现,如下是一个示例:
1 2 3 4 5 6 7 |
public Job job() { return stepBuilders.get("parallelSteps") .start(step1) .split(asyncTaskExecutor).add(flow1, flow2) .next(step3) .build(); } |
在这个示例中咱们先执行step1,而后并行执行flow1和flow2,最后再执行step3。
Spring batch提供了PartitionStep来实现对同一个step在多个进程中实现并行处理。经过PartitonStep再配合PartitionHandler能够将一个step扩展到多个Slave上实现并行运行。
远程执行Chunk任务则是将某个Step的processer操做分割到多个进程中,多个进程经过一些中间件进行通信(好比采用消息的方式)。这种方式适合于Processer是瓶颈而Reader和Writer不是瓶颈的场景。
结语
Spring Batch对批处理场景进行了合理的抽象,封装了大量的实用功能,使用它来开发批处理应用能够达到事半功倍的效果。在使用的过程当中咱们仍须要坚持总结一些最佳实践,从而可以交付高质量的可维护的批处理应用,知足企业级应用的苛刻要求。