tags: springbatchjava
前面《数据批处理神器-Spring Batch(1)简介及使用场景》已经介绍了Spring Batch
是一个轻量级,完善的批处理框架,它使用起来简单,方便,比较适合有点编程基础(特别是使用Spring及SpringBoot框架)的开发人员,针对业务编程,只须要关心具体的业务实现便可,把流程以及流程的控制交给Spring Batch
就好。常言道"talk is cheap, show me the code
",下面咱们就经过一个简单的hello world
,进入Spring Batch
的世界,经过这个示例,能够快速了解开发批处理的流程和Spring Batch
开发用到的组件,为后续的操做打下基础。git
本helloworld实现一个很是简单的功能,就是从数据组中读取字符串,把字符串转为大写,而后输出到控制台。如图:github
整个过程就是一个批重任务(Job
),它只有一个步骤(Job Step
),步骤里分为三个阶段,读数据(ItemReader)、处理数据(ItemProcessor)、写数据(ItemWriter)。spring
开发的主要代码以下:数据库
整体来讲就是,经过Reader
,Processor
、Writer
完成任务,结束后经过Listener
进行监听,整个任务经过配置(BatchConfig
)进行配置。编程
Spring Boot
工程直接使用Idea生成或在使用Spring Initializr
生成便可,此处不详细说明。也能够直接使用个人代码示例。当前使用的Spring Boot
版本是2.1.4.RELEASE
数组
Spring Batch
依赖 在使用spring-boot-starter-parent
的状况下,直接添加如下依赖便可:<!-- 批处理框架-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
复制代码
引用后,会引用两个jar包,一个是spring-batch-infrastructure
,一个是spring-batch-core
,版本是4.1.2.RELEASE
。分别对应的是基础框架层和核心层。bash
Spring Batch
是须要数据库来存储任务的基本信息以及运行状态的,本例中不须要操做数据库逻辑,直接使用内存数据库H2便可。添加如下依赖:<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
复制代码
lombok
进行处理。使用Spring Boot
进行单元测试,添加依赖以下:<!-- 工具包:lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<!-- 测试框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
复制代码
添加完依赖后,就能够进入业务逻辑编程了。按Spring Batch
的批处理流程,读数据ItemReader
是第一步,当前示例中,咱们的任务是从数组中读取数据。ItemReader
是一个接口,开发人员直接实现此接口便可。此接口定义了核心方法read()
,负责从给定的资源中读取可用的数据。具体实现以下:框架
@Slf4j
public class StringReader implements ItemReader<String> {
private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
private int count = 0;
@Override
public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
if(count < messages.length){
String message = messages[count++];
log.debug(LogConstants.LOG_TAG + "read data:"+message);
return message;
}else{
log.debug(LogConstants.LOG_TAG + "read data end.");
count = 0;
}
return null;
}
}
复制代码
说明:ide
@Slf4j
注解,直接可以使用log进行输出,简化操做。读取数据后,返回的数据会流到ItemProcessor
进行处理。一样,ItemProcessor
是一个接口,要实现本身的处理逻辑,实现此接口便可。固然,若是没有ItemProcessor
,读到的数据直接就到ItemWriter
流程也是能够的。此处,Spring Batch
有一个Chunk
的概念,用于屡次读,直到chunk指定的数量后,再统一给到processor和writer,以提升效率。本示例对于ItemProcessor
的实现很简单,即把字符串转为大写。以下:
@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
@Autowired
private ConsoleService consoleService;
@Override
public String process(String data) {
String dataProcessed = consoleService.convert2UpperCase(data);
log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
return dataProcessed;
}
}
复制代码
说明:
toUpperCase()
方法数据处理完后,会统一交给写组件(ItemWriter
)进行写入。ItemWriter
也是一个接口,核心方法是write
方法,参数是数组。要实现本身的逻辑,实现此接口便可。本示例中,直接把数据输出到日志中便可。以下:
@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> list) {
for (String msg :list) {
log.debug(LogConstants.LOG_TAG + "write data: "+msg);
}
}
}
复制代码
数据写入到目标后,任务即结束,但有时候咱们还须要在任务结束时去作一些其它工做,如清理数据,更新时间等,则须要在任务完成后进行逻辑处理。Spring Batch
对于任务或步骤开始和结束都会提供监听,以便于开发人员实现监听逻辑。如经过继承JobExecutionListenerSupport
,能够实现beforeJob
和afterJob
的监听,以实现开始任务前和结束任务后的处理。当前示例中,仅输出任务完成的日志。以下:
@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
log.debug("console batch job complete!");
}
}
}
复制代码
通过上面的读、处理、写、任务完成后监听的操做,如今须要把它们组装在一块儿,造成一个完成的任务,使用Spring Boot
,简单的使用几个配置便可完成任务的组装。任务及其相关组件的关系以下:
建立配置文件ConsoleBatchConfig.java
,具体代码以下:
@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
.end().build();
}
@Bean
public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName).listener(commonStepListener)
.<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
.writer(consoleWriter).build();
}
@Bean
public ItemReader stringReader(){return new StringReader();}
@Bean
public ItemWriter consoleWriter(){return new ConsoleWriter();}
@Bean
public ItemProcessor convertProcessor(){return new ConvertProcessor();}
@Bean
public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}
复制代码
说明:
@Configuration及
和@EnableBatchProcessing
,标识为配置及启用Spring Batch
的配置(能够直接使用JobBuilderFactory
及StepBuilderFactory
分别用于建立Job和Step)。ItemReader
、ItemWriter
、ItemProcessor
、Listener
对应的Bean
,以供Step及Job的注入。stepBuilderFactory
建立做业Step,其中chunk进行面向块的处理,即屡次读取后再写入,提升效率。当前配置是3个为一个chunk。jobBuilderFactory
添加step,建立任务。get
方法肯定),此处直接使用方法名做为Job和Step的名称。通过上面的步骤,已经完成Job的开发,测试则可以使用两种方式,一个是编写Controller
,以接口调用的方式运行job,一种编写单元测试。
JobLauncher
的run
方法来运行任务,run
方法参数分别是Job
和jobParameters
,即已配置的Job及job运行的参数。每一个任务的区分是经过任务名(jobName
)和任务参数(jobParameters
)做为区别的,即若是jobName
和jobParameters
相同,Spring Batch
会认为是同一任务,若任务已运行成功,同一任务不会再运行。所以,通常来讲,不一样的任务,咱们的jobParameters
能够直接以时间做为参数,以便于区别。生成jobParameters
。代码以下:JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
复制代码
ConsoleJobTest
,加载job,运行测试,以下所示:@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job consoleJob;
public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//构建参数
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
//执行任务
JobExecution run = jobLauncher.run(consoleJob, jobParameters);
ExitStatus exitStatus = run.getExitStatus();
log.debug(exitStatus.toString());
}
}
复制代码
说明:引入SpringBootTest
注解时,须要把Spring Batch
任务也引入进来。
执行结果输出 执行结果以下图所示:
从输出可知,因为设置的chunk
是3,读取3个数据后,就统一给ItemProcessor
进行大写转换处理,而后统一交给ItemWriter
进行写入。执行完成后,Job的exitCode表示任务执行的状态,若是正常则为COMPLETED
,失败则为FAILED
。
通过以上的操做步骤,便可完成批处理操做。关于任务的状态,流程的步骤(读、处理、写)均交给Spring Batch
来完成,开发人员所作的工做是根据本身的业务逻辑编写具体的读数据、处理数据和写数据便可。但愿经过本文,你们能够对Spring Batch
的组件有清晰的了解。