本文将从0到1讲解一个Spring Batch是如何搭建并运行起来的。
本教程将讲解从一个文本文件读取数据,而后写入MySQL。java
Spring Batch 做为 Spring 的子项目,是一款基于 Spring 的企业批处理框架。经过它能够构建出健壮的企业批处理应用。Spring Batch 不只提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程当中解放出来,使他们能够更多地去关注核心的业务处理过程。mysql
更多的介绍能够参考官网:https://spring.io/projects/sp...spring
我是用的Intellij Idea,用gradle构建。sql
可使用Spring Initializr 来建立Spring boot应用。地址:https://start.spring.io/数据库
首先选择Gradle Project,而后选择Java。填上你的Group和Artifact名字。json
最后再搜索你须要用的包,好比Batch是必定要的。另外,因为我写的Batch项目是使用JPA向MySQL插入数据,因此也添加了JPA和MySQL。其余能够根据本身须要添加。并发
点击Generate Project,一个项目就建立好了。app
Build.gralde文件大概就长这个样子:框架
buildscript { ext { springBootVersion = '2.0.4.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.demo' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-batch') compile('org.springframework.boot:spring-boot-starter-jdbc') compile("org.springframework.boot:spring-boot-starter-data-jpa") compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4' compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA' compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6', testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') }
网上有不少Spring Batch结构和原理的讲解,我就不详细阐述了,我这里只讲一下Spring Batch的一个基本层级结构。maven
首先,Spring Batch运行的基本单位是一个Job,一个Job就作一件批处理的事情。
一个Job包含不少Step,step就是每一个job要执行的单个步骤。
以下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于能够重复利用的东西。
而后是Chunk,chunk就是数据块,你须要定义多大的数据量是一个chunk。
Chunk里面就是不断循环的一个流程,读数据,处理数据,而后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。
首先,咱们须要一个全局的Configuration来配置全部的Job和一些全局配置。
代码以下:
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory firstJobContext() { return new GenericApplicationContextFactory(FirstJobConfiguration.class); } @Bean public ApplicationContextFactory secondJobContext() { return new GenericApplicationContextFactory(SecondJobConfiguration.class); } }
@EnableBatchProcessing是打开Batch。若是要实现多Job的状况,须要把EnableBatchProcessing注解的modular设置为true,让每一个Job使用本身的ApplicationConext。
好比上面代码的就建立了两个Job。
本博客的例子是迁移数据,数据源是一个文本文件,数据量是上百万条,一行就是一条数据。而后咱们经过Spring Batch帮咱们把文本文件的数据所有迁移到MySQL数据库对应的表里面。
假设咱们迁移的数据是Message,那么咱们就须要提早建立一个叫Message的和数据库映射的数据类。
@Entity @Table(name = "message") public class Message { @Id @Column(name = "object_id", nullable = false) private String objectId; @Column(name = "content") private String content; @Column(name = "last_modified_time") private LocalDateTime lastModifiedTime; @Column(name = "created_time") private LocalDateTime createdTime; }
首先咱们须要一个关于这个Job的Configuration,它将在SpringBatchConfigration里面被加载。
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory messageMigrationJobContext() { return new GenericApplicationContextFactory(MessageMigrationJobConfiguration.class); } }
下面的关于构建Job的代码都将写在这个MessageMigrationJobConfiguration里面。
public class MessageMigrationJobConfiguration { }
咱们先定义一个Job的Bean。
@Autowired private JobBuilderFactory jobBuilderFactory; @Bean public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) { return jobBuilderFactory.get("messageMigrationJob") .start(messageMigrationStep) .build(); }
jobBuilderFactory是注入进来的,get里面的就是job的名字。
这个job只有一个step。
接下来就是建立Step。
@Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader, @Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter, @Qualifier("errorWriter") Writer errorWriter) { return stepBuilderFactory.get("messageMigrationStep") .<Message, Message>chunk(CHUNK_SIZE) .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT) .listener(new MessageItemReadListener(errorWriter)) .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT) .listener(new MessageWriteListener()) .build(); }
stepBuilderFactory是注入进来的,而后get里面是Step的名字。
咱们的Step中能够构建不少东西,好比reader,processer,writer,listener等等。
下面咱们就逐个来看看step里面的这些东西是如何使用的。
Spring batch在配置Step时采用的是基于Chunk的机制,即每次读取一条数据,再处理一条数据,累积到必定数量后再一次性交给writer进行写入操做。这样能够最大化的优化写入效率,整个事务也是基于Chunk来进行。
好比咱们定义chunk size是50,那就意味着,spring batch处理了50条数据后,再统一贯数据库写入。
这里有个很重要的点,chunk前面须要定义数据输入类型和输出类型,因为咱们输入是Message,输出也是Message,因此两个都直接写Message了。
若是不定义这个类型,会报错。
.<Message, Message>chunk(CHUNK_SIZE)
Reader顾名思义就是从数据源读取数据。
Spring Batch给咱们提供了不少好用实用的reader,基本能知足咱们全部需求。好比FlatFileItemReader,JdbcCursorItemReader,JpaPagingItemReader等。也能够本身实现Reader。
本例子里面,数据源是文本文件,因此咱们就使用FlatFileItemReader。FlatFileItemReader是从文件里面一行一行的读取数据。
首先须要设置文件路径,也就是设置resource。
由于咱们须要把一行文本映射为Message类,因此咱们须要本身设置并实现LineMapper。
@Bean public FlatFileItemReader<Message> jsonMessageReader() { FlatFileItemReader<Message> reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource(new File(MESSAGE_FILE))); reader.setLineMapper(new MessageLineMapper()); return reader; }
LineMapper的输入就是获取一行文本,和行号,而后转换成Message。
在本例子里面,一行文本就是一个json对象,因此咱们使用JsonParser来转换成Message。
public class MessageLineMapper implements LineMapper<Message> { private MappingJsonFactory factory = new MappingJsonFactory(); @Override public Message mapLine(String line, int lineNumber) throws Exception { JsonParser parser = factory.createParser(line); Map<String, Object> map = (Map) parser.readValueAs(Map.class); Message message = new Message(); ... // 转换逻辑 return message; } }
因为本例子里面,数据是一行文本,经过reader变成Message的类,而后writer直接把Message写入MySQL。因此咱们的例子里面就不须要Processor,关于如何写Processor其实和reader/writer是同样的道理。
从它的接口能够看出,须要定义输入和输出的类型,把输入I经过某些逻辑处理以后,返回输出O。
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
Writer顾名思义就是把数据写入到目标数据源里面。
Spring Batch一样给咱们提供不少好用实用的writer。好比JpaItemWriter,FlatFileItemWriter,HibernateItemWriter,JdbcBatchItemWriter等。一样也能够自定义。
本例子里面,使用的是JpaItemWriter,能够直接把Message对象写到数据库里面。可是须要设置一个EntityManagerFactory,能够注入进来。
@Autowired private EntityManagerFactory entityManager; @Bean public JpaItemWriter<Message> messageItemWriter() { JpaItemWriter<Message> writer = new JpaItemWriter<>(); writer.setEntityManagerFactory(entityManager); return writer; }
另外,你须要配置数据库的链接等东西。因为我使用的spring,因此直接在Application.properties里面配置以下:
spring.datasource.url=jdbc:mysql://database spring.datasource.username=username spring.datasource.password=password spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect spring.jpa.show-sql=true spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true spring.jackson.serialization.write-dates-as-timestamps=false spring.batch.initialize-schema=ALWAYS spring.jpa.hibernate.ddl-auto=update
spring.datasource相关的设置都是在配置数据库的链接。
spring.batch.initialize-schema=always表示让spring batch在数据库里面建立默认的数据表。
spring.jpa.show-sql=true表示在控制台输出hibernate读写数据库时候的SQL。
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect是在指定MySQL的方言。
Spring Batch一样实现了很是完善全面的listener,listener很好理解,就是用来监听每一个步骤的结果。好比能够有监听step的,有监听job的,有监听reader的,有监听writer的。没有你找不到的listener,只有你想不到的listener。
在本例子里面,我只关心,read的时候有没有出错,和write的时候有没有出错,因此,我只实现了ReadListener和WriteListener。
在read出错的时候,把错误结果写入一个单独的error列表文件中。
public class MessageItemReadListener implements ItemReadListener<Message> { private Writer errorWriter; public MessageItemReadListener(Writer errorWriter) { this.errorWriter = errorWriter; } @Override public void beforeRead() { } @Override public void afterRead(Message item) { } @Override public void onReadError(Exception ex) { errorWriter.write(format("%s%n", ex.getMessage())); } }
在write出错的时候,也作一样的事情,把出错的缘由写入单独的日志中。
public class MessageWriteListener implements ItemWriteListener<Message> { @Autowired private Writer errorWriter; @Override public void beforeWrite(List<? extends Message> items) { } @Override public void afterWrite(List<? extends Message> items) { } @Override public void onWriteError(Exception exception, List<? extends Message> items) { errorWriter.write(format("%s%n", exception.getMessage())); for (Message message : items) { errorWriter.write(format("Failed writing message id: %s", message.getObjectId())); } } }
前面有说chuck机制,因此write的listener传入参数是一个List,由于它是累积到必定的数量才一块儿写入。
Spring Batch提供了skip的机制,也就是说,若是出错了,能够跳过。若是你不设置skip,那么一条数据出错了,整个job都会挂掉。
设置skip的时候必定要设置什么Exception才须要跳过,而且跳过多少条数据。若是失败的数据超过你设置的skip limit,那么job就会失败。
你能够分别给reader和writer等设置skip机制。
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
这个和Skip是同样的原理,就是失败以后能够重试,你一样须要设置重试的次数。
一样能够分别给reader,writer等设置retry机制。
若是同时设置了retry和skip,会先重试全部次数,而后再开始skip。好比retry是10次,skip是20,会先重试10次以后,再开始算第一次skip。
全部东西都准备好之后,就是如何运行了。
运行就是在main方法里面用JobLauncher去运行你制定的job。
下面是我写的main方法,main方法的第一个参数是job的名字,这样咱们就能够经过不一样的job名字跑不一样的job了。
首先咱们经过运行起来的Spring application获得jobRegistry,而后经过job的名字找到对应的job。
接着,咱们就能够用jobLauncher去运行这个job了,运行的时候会传一些参数,好比你job里面须要的文件路径或者文件日期等,就能够经过这个jobParameters传进去。若是没有参数,能够默认传当前时间进去。
public static void main(String[] args) { String jobName = args[0]; try { ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args); JobRegistry jobRegistry = context.getBean(JobRegistry.class); Job job = jobRegistry.getJob(jobName); JobLauncher jobLauncher = context.getBean(JobLauncher.class); JobExecution jobExecution = jobLauncher.run(job, createJobParams()); if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } catch (Exception e) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } private static JobParameters createJobParams() { return new JobParametersBuilder().addDate("date", new Date()).toJobParameters(); }
最后,把jar包编译出来,在命令行执行下面的命令,就能够运行你的Spring Batch了。
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
调试主要依靠控制台输出的log,能够在application.properties里面设置log输出的级别,好比你但愿输出INFO信息仍是DEBUG信息。
基本上,经过查看log都能定位到问题。
logging.path=build/logs logging.file=${logging.path}/batch.log logging.level.com.easystudio=INFO logging.level.root=INFO log4j.logger.org.springframework.jdbc=INFO log4j.logger.org.springframework.batch=INFO logging.level.org.hibernate.SQL=INFO
若是你的batch最终会写入数据库,那么Spring Batch会默认在你的数据库里面建立一些batch相关的表,来记录全部job/step运行的状态和结果。
大部分表你都不须要关心,你只须要关心几张表。
batch_job_instance:这张表能看到每次运行的job名字。
batch_job_execution:这张表能看到每次运行job的开始时间,结束时间,状态,以及失败后的错误消息是什么。
batch_step_execution:这张表你能看到更多关于step的详细信息。好比step的开始时间,结束时间,提交次数,读写次数,状态,以及失败后的错误信息等。
Spring Batch为咱们提供了很是实用的功能,对批处理场景进行了完善的抽象,它不只能实现小数据的迁移,也能应对大企业的大数据实践应用。它让咱们开发批处理应用能够事半功倍。
最后一个tips,搭建Spring Batch的过程当中,会遇到各类各样的问题。只要善用Google,都能找到答案。