Spring Batch(2)——Job配置与运行

Spring Batch(1)——数据批处理概念 文中介绍了批处理的概念以及Spring Batch相关的使用场景,后续将会陆续说明在代码层面如何使用。html

引入

Spring batch的引入很是简单,只须要引入Spring Framework、Datasource以及Spring Batch。在Spring Boot体系下只需引入spring-boot-starter-batch 便可。他已经涵盖了以上全部内容。spring

Job配置

Job接口有多种多样的实现类,一般咱们使用configuration类来构建获取一个Jobapp

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob") //Job名称
                     .start(playerLoad()) //Job Step
                     .next(gameLoad()) //Job Step
                     .next(playerSummarization()) //Job Step
                     .end()
                     .build();
}

上面的代码定义了一个Job实例,而且在这个实例中包含了三个Step实例框架

重启(启动)配置

批处理的一个核心问题是须要定义重启(启动)时的一些行为。当指定的JobInstanceJobExecution执行时候即认为某个Job已经重启(启动)。理想状态下,全部的任务都应该能够从它们以前中断的位置启动,可是某些状况下这样作是没法实现的。开发人员能够关闭重启机制或认为每次启动都是新的JobInstance异步

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart() //防止重启
                     ...
                     .build();
}

监听Job Execution

当任务执行完毕或开始执行时,须要执行一些处理工做。这个时候能够使用JobExecutionListener分布式

public interface JobExecutionListener {
    void beforeJob(JobExecution jobExecution);
    void afterJob(JobExecution jobExecution);
}

添加方式:ide

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener()) //JobExecutionListener的实现类
                     ...
                     .build();
}

须要注意的是afterJob方法不管批处理任务成功仍是失败都会被执行,因此增长如下判断:spring-boot

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

除了直接实现接口还能够用 @BeforeJob 和 @AfterJob 注解。ui

Java配置

在Spring Batch 2.2.0版本以后(Spring 3.0+)支持纯Java配置。其核心是@EnableBatchProcessing注解和两个构造器。@EnableBatchProcessing的做用相似于Spring中的其余@Enable*,使用@EnableBatchProcessing以后会提供一个基本的配置用于执行批处理任务。对应的会有一系列StepScope实例被注入到Ioc容器中:JobRepositoryJobLauncherJobRegistryPlatformTransactionManagerJobBuilderFactory以及StepBuilderFactorythis

配置的核心接口是BatchConfigurer,默认状况下须要在容器中指定DataSource,该数据源用于JobRepository相关的表。开发的过程当中能够使用自定义的BatchConfigurer实现来提供以上全部的Bean。一般状况下能够扩展重载DefaultBatchConfigurer类中的Getter方法用于实现部分自定义功能:

@Bean
public BatchConfigurer batchConfigurer() {
    return new DefaultBatchConfigurer() {
        @Override
        public PlatformTransactionManager getTransactionManager() {
            return new MyTransactionManager();
        }
    };
}

使用了@EnableBatchProcessing以后开发人员能够使用如下的方法来配置一个Job:

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

JobRepository配置

一旦使用了@EnableBatchProcessing 注解,JobRepository即会被注入到IoCs容器中并自动使用容器中的DataSourceJobRepository用于处理批处理表的CURD,整个Spring Batch的运行都会使用到它。除了使用容器中默认的DataSoruce以及其余组件,还能够在BatchConfigurer中进行配置:

@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}

在代码中能够看到,设置JobRepository须要DataSourceTransactionManager,若是没有指定将会使用容器中的默认配置。

JobRepository的事物配置

默认状况下框架为JobRepository提供了默认PlatformTransactionManager事物管理。它用于确保批处理执行过程当中的元数据正确的写入到指定数据源中。若是缺少事物,那么框架产生元数据就没法和整个处理过程彻底契合。

以下图,在BatchConfigurer中的setIsolationLevelForCreate方法中能够指定事物的隔离等级:

protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

setIsolationLevelForCreate方法支持2个值:ISOLATION_SERIALIZABLEISOLATION_REPEATABLE_READ,前者是默认配置,相似于@Transactional(isolation = Isolation.SERIALIZABLE),表示查询和写入都是一次事物,会对事物进行严格的锁定,当事物完成提交后才能进行其余的读写操做,容易死锁。后者是读事物开放,写事物锁定。任什么时候候均可以快速的读取数据,可是写入事物有严格的事物机制。当一个事物挂起某些记录时,其余写操做必须排队。

修改表名称

默认状况下,JobRepository管理的表都以*BATCH_*开头。须要时能够修改前缀:

// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_"); //修改前缀
    return factory.getObject();
}

内存级存储

Spring Batch支持将运行时的状态数据(元数据)仅保存在内存中。重载JobRepository不设置DataSource便可:

@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

须要注意的是,内存级存储没法知足分布式系统。

JobLauncher配置

启用了@EnableBatchProcessing以后JobLauncher会自动注入到容器中以供使用。此外能够自行进行配置:

@Override
protected JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

JobLauncher惟一的必要依赖只有JobRepository。以下图,Job的执行一般是一个同步过程: Job同步执行

能够经过修改TaskExecutor来指定Job的执行过程:

@Bean
public JobLauncher jobLauncher() {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

这样执行过程变为:

Job异步过程

运行一个Job

以一个Http为例:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

单单是配置好Job是确定没法执行的,还须要对Step进行配置。后面会陆续介绍。

相关文章
相关标签/搜索