在 Spring Batch 中进行数据及参数传递的方法。
本文是 Spring Batch 系列文章的第9篇,有兴趣的可见文章:java
前面文章以实例的方式对 Spring Batch 进行批处理进行详细说明,相信你们对字符串、文件,关系型数据库及 NoSQL 数据库的读取,处理,写入流程已比较熟悉。有小伙伴就问,针对这个任务流程,期间有多个步骤,从任务( Job )启动,到做业步( Step )的执行,其中又包含读组件、处理组件、写组件,那么,针对这个流程,若中间须要传递自定义的数据,该如何处理?本文将对 Spring Batch 进行数据传递的方法进行描述,依然会使用代码实例的方式进行讲解。包括如下几个内容:mysql
本示例源码已放至github:https://github.com/mianshenglee/spring-batch-example/tree/master/spring-batch-param
,请结合示例代码进行阅读。git
本示例仍是使用原来示例功能,从源数据库读取用户数据,处理数据,而后写入到目标数据库。其中会在任务启动时传递参数,并在做业步中传递参数。以前已经介绍过如何使用 beetlsql 进行多数据源配置(便捷的数据读写-spring batch(5)结合beetlSql进行数据读写),实现数据批处理。还有不少朋友使用 Mybatis 或 Mybatis-plus 进行数据库读写,所以,有必要提一下 Spring Batch 如何结合 Mybatis 或 Mybatis-plus 配置多数据源操做。本示例以 Mybatis-plus 为例。github
示例工程中的sql
目录有相应的数据库脚本,其中源数据库mytest.sql
脚本建立一个test_user
表,并有相应的测试数据。目标数据库 my_test1.sql
与 mytest.sql
表结构一致,spring-batch-mysql.sql
是 Spring Batch 自己提供的数据库脚本。spring
<!--mybatis-plus--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.0</version> </dependency>
本示例会涉及三个数据库,分别是 Spring Batch 自己数据库,须要批处理的源数据库,批处理的目标数据库。所以须要处理多个数据库,利用多套源策略,能够很简单就完成多套数据源的处理。简单来讲主要分为如下几个步骤:sql
mapper
包,entity
包,mapper.xml
文件包 SqlSessionFactory
mapper
关于多数据源多套源策略的详细配置过程,能够参考个人另外一篇文章《搞定SpringBoot多数据源(1):多套源策略》数据库
关于 Spring Batch 的读数据( ItemReader )、处理数据( ItemProcessor )、写数据( ItemWriter )的配置流程,能够参考前面系列文章,本文再也不详细描述。咱们须要记住的是,当一个做业( Job )启动,Spring Batch 是经过做业名称( Job name)及 做业参数( JobParameters )做为惟一标识来区分不一样的做业。一个 Job 下能够有多个做业步( Step ),每一个 Step 中就是有具体的操做逻辑(读、处理、写)。在 Job 和 Step 下的各个操做步骤间,如何传递,,这里就须要理解 ExecutionContext 的概念。mybatis
在 Job 的运行及 Step 的运行过程当中,Spring Batch 提供 ExecutionContext 进行运行数据持久化,利用它,能够根据业务进行数据共享,如用来重启的静态数据与状态数据。以下图:app
Execution Context 本质上来说就是一个 Map<String,Object>
,它是Spring Batch 框架提供的持久化与控制的 key/value 对,可让开发者在 Step 运行或Job 运行过程当中保存须要进行持久化的状态,它能够。分为两类,一类是Job 运行的上下文(对应数据表:BATCH_JOB_EXECUTION_CONTEXT),另外一类是Step Execution的上下文(对应数据表BATCH_STEP_EXECUTION_CONTEXT)。两类上下文关系:一个 Job 运行对应一个 Job Execution 的上下文(如上图中蓝色部分的 ExecutionContext ),每一个 Step 运行对应一个 Step Execution 上下文(如上图中粉色部分的 ExecutionContext );同一个 Job 中的 Step Execution 共用 Job Execution 的上下文。也就是说,它们的做用范围有区别。所以,若是同一个 Job 的不一样 Step 间须要共享数据时,能够经过 Job Execution 的上下文共享数据。根据 ExecutionContext 的共享数据特性,则能够实如今不一样步骤间传递数据。框架
一个 Job 启动后,会生成一个 JobExecution ,用于存放和记录 Job 运行的信息,一样,在 Step 启动后,也会有对应的 StepExecution 。如前面所说,在 JobExecution 和 StepExecution 中都会有一个 ExecutionContext ,用于存储上下文。所以,数据传递的思路就是肯定数据使用范围,而后经过 ExecutionContext 传入数据,而后就能够在对应的范围内共享数据。如当前示例,须要 Job 范围内共享数据,在读组件( ItemReader )和写组件( ItemWriter )中传递读与写数据的数量( size ),在 Job 结束时,输出读及写的数据量。实际上 Spring Batch 会自动计算读写数量,本示例仅为了显示数据共享功能。
那么,如何获取对应的 Execution ?,Spring Batch 提供了 JobExecutionListener 和 StepExecutionListener 监听器接口,经过实现监听器接口,分别能够在开启做业前( beforeJob )和 完成做业后( afterJob )afterJob ),开启做业步前( beforeStep)及 完成做业步后( afterStep )获取对应的 Execution ,而后进行操做。
在自定义的 UserItemReader 和 UserItemWriter 中,实现 StepExecutionListener 接口,其中使用 StepExecution 做为成员,从 beforeStep 中获取。以下:
public class UserItemWriter implements ItemWriter<TargetUser>, StepExecutionListener { private StepExecution stepExecution; //...略 @Override public void beforeStep(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
读组件( UserItemReader )也使用一样的方式。而在做业结束后,获取参数,则能够继承 JobExecutionListenerSupport ,实现本身感兴趣的方法,也从参数中获取 JobExecution,而后获取参数进行处理。
public class ParamJobEndListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) {} }
因为咱们须要在 Job 范围内传递参数,获取到 StepExecution 后,能够得到相应的 JobExecution ,进而获取 Job 对应的 executionContext,这样,就能够在 Job 范围内共享参数数据了。以下是在读组件中进行配置
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
一样在写组件中,获取到 ExecutionContext 后,能够对参数进行处理。本示例中,是经过对 ItemReader 传递的处理数目参数进行累加处理,获得结果。
@Override public void write(List<? extends TargetUser> items) { ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); Object currentWriteNum = executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM); if (Objects.nonNull(currentWriteNum)) { log.info("currentWriteNum:{}", currentWriteNum); executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()+(Integer)currentWriteNum); } else { executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()); }
最后在做业结束后,在实现 JobExecutionListenerSupport 的接口中,afterJob 函数中,对参数进行输出。
public class ParamJobEndListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution.getExecutionContext(); Integer writeNum = (Integer)executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM); log.info(LogConstants.LOG_TAG + "writeNum:{}",writeNum); } }
前面说到在 Job 及 Step 范围内,使用 ExecutionContext 进行数据共享,但,若是须要在 Job 启动前设置参数,而且每次启动输入的参数是动态变化的(好比增量同步时,日期是基于上一次同步的时间或者ID),也就是说,每次运行,须要根据参数新建一个操做步骤(如 ItemReader、ItemWriter等),咱们知道,因为在 Spring IOC 中加载的Bean,默认都是单例模式的,所以,须要每次运行新建,运行完销毁,新建是在运行时进行的。这就须要用到StepScope 及后期绑定技术。
在以前的示例中,已出现过 StepScope,它的做用是提供了操做步骤的做用范围,某个 Spring Bean 使用注解StepScope,则表示此 Bean 在做业步( Step )开始的时候初始化,在 Step 结束的时候销毁,也就是说 Bean的做用范围是在 Step 这个生命周期中。而 Spring Batch 经过属性后期绑定技术,在运行期获取属性值,并使用 SPEL 的表达式进行属性绑定。而在 StepScope 中,Spring Batch 框架提供 JobParameters,JobExecutionContext,StepExecutionContext,固然也可使用 Spring 容器中的 Bean ,如 JobExecution ,StepExecution。
一个 Job 是由 Job name 及 JobParameters 做为惟一标识的,也就是说只有 job name 和 JobParameters 不一致时,Spring Batch 才会启动一个新的 Job,一致的话就看成是同一个 Job ,若 此 Job 未执行过,则执行;若已执行过且是 FAILED 状态,则尝试从新运行此 Job ,若已执行过且是 COMPLETED 状态,则会报错。
本示例中,Job 启动时输入时间参数,在 ItemReader 中使用 StepScope 注解,而后把时间参数绑定到 ItemReader 中,同时绑定 StepExecution ,以便于在 ItemReader 对时间参数及 StepExecution 进行操做。
在使用 JobLauncher 启动 Job 时,是须要输入 jobParameters 做为参数的。所以能够建立此对象,并设置参数。
JobParameters jobParameters = new JobParametersBuilder() .addLong("time",timMillis) .toJobParameters();
在配置 Step 时,须要建立ItemReader 的 Bean,为了使用动态参数,在 ItemReader 中设置 Map 存放参数,并设置 StepExecution 为成员,以便于后面使用 ExecutionContext。
public class UserItemReader implements ItemReader<User> { protected Map<String, Object> params; private StepExecution stepExecution; public void setStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
使用 StepScope 进行配置:
@Bean @StepScope public ItemReader paramItemReader(@Value("#{stepExecution}") StepExecution stepExecution, @Value("#{jobParameters['time']}") Long timeParam) { UserItemReader userItemReader = new UserItemReader(); //设置参数 Map<String, Object> params = CollUtil.newHashMap(); Date datetime = new Date(timeParam); params.put(SyncConstants.PASS_PARAM_DATETIME, datetime); userItemReader.setParams(params); userItemReader.setStepExecution(stepExecution); return userItemReader; }
注意:此时 ItemReader 不可再使用实现 StepExecutionListener 的方式来对 stepExecution 赋值,因为 ItemReader 是动态绑定的,StepExecutionListener 将再也不起做用,所以须要在后期绑定中来绑定 stepExecution Bean 的方式来赋值。
ItemReader 获取到 StepExecution 后便可获取 ExecutionContext,而后能够像前面说的使用 ExecutionContext 方式进行数据传递。以下:
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext(); //readNum参数 executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size()); //datetime参数 executionContext.put(SyncConstants.PASS_PARAM_DATETIME,params.get(SyncConstants.PASS_PARAM_DATETIME));
在 Job 和 Step 不一样的数据范围中,可以使用 ExecutionContext 共享数据。本文以传递处理数量为例,使用 Mybatis-plus,基于 ExecutionContext ,结合 StepScope及后期绑定技术,实如今 Job 启动传入参数,而后在 ItemReader、ItemProcessor、ItemWriter 及 Job 完成后的数据共享及传递。若是你在使用 Spring Batch 过程当中须要进行数据共享与传递,请试试这种方式吧。
若是文章内容对你有帮助,欢迎转发分享~
个人公众号(搜索Mason技术记录
),获取更多技术记录: