背景:以前看了公司的数据导入平台,导入用户订单记录,使用的是SpringBoot 自带的Scheduled 任务调度功能,而后经过发邮件通知开发人员任务调度是否成功。总体上来讲,功能是支持现有业务,可是开发人员不可以清晰的了解任务运行的情况且任务调度也不够灵活。原本想本身开发一套,想到本身可能考虑的不够周全,致使开发进度慢,每当有新改动会伤筋动骨,因此,我从网上找到了Spring Batch,首先看下SpringBatch的官网介绍:html
Spring Batch是一个轻量级,全面的批处理框架,旨在开发对企业系统平常运营相当重要的强大批处理应用程序。Spring Batch构建了人们指望的Spring Framework特性(生产力,基于POJO的开发方法和通常易用性),同时使开发人员能够在必要时轻松访问和利用更高级的企业服务。Spring Batch不是一个调度框架。商业和开源领域都有许多优秀的企业调度程序(例如Quartz,Tivoli,Control-M等)。它旨在与调度程序一块儿使用,而不是替换调度程序。java
Spring Batch提供了可重复使用的功能,这些功能对于处理大量记录相当重要,包括记录/跟踪,事务管理,做业处理统计,做业重启,跳过和资源管理。它还提供更高级的技术服务和功能,经过优化和分区技术实现极高容量和高性能的批处理做业。Spring Batch可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等)上)。大批量批处理做业能够高度可扩展的方式利用该框架来处理大量信息。mysql
Spring Batch更多偏向于Job的配置,调度的话暂时不急于细看,以后会去专门研究一下Quartz等企业调度程序。git
ATP:咱们先不细致的去研究Spring Batch具体实现,先从一个小Demo开始入手spring
工具软件:sql
IDEA:2018.2 数据库
Java: 1.8.0_171数组
Gradle: 4.10.2app
Demo地址:https://gitee.com/leonchen21/SpringBatchDemo/tree/SpringBatchDemo_01框架
1、建立一个SpringBoot Gradle项目
SpringBootVersion:2.1.6
添加开发支持,如图所示
首先进行Spring Batch的配置,建立一个配置类,以下
package person.leon.batch.springbatchdemo.config; import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; /** * 自定义Batch基础配置 * * @author leon * @since 2019/6/28 15:41 */ @Configuration public class BatchConfigurerConfig extends DefaultBatchConfigurer { @Autowired private DataSource dataSource; @Override public JobLauncher createJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(getJobRepository()); //做业异步执行 针对http请求调用 // jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return jobLauncher; } @Override public JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource); factory.setTransactionManager(getTransactionManager()); factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE"); factory.setTablePrefix("BATCH_"); factory.setMaxVarCharLength(1000); return factory.getObject(); } @Override public JobExplorer createJobExplorer() throws Exception { JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean(); jobExplorerFactoryBean.setDataSource(dataSource); jobExplorerFactoryBean.afterPropertiesSet(); return jobExplorerFactoryBean.getObject(); } }
而后在application.propertities文件中添加如下配置
# SpringBatch 自动执行建表语句
spring.batch.initialize-schema=ALWAYS
SpringBatch框架自身也须要一些表支持,因此这里须要咱们进行配置,让框架自行建立,这里还须要再加一些默认数据库的配置。
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/batchdemo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true&useSSL=false&autoReconnect=true&failOverReadOnly=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456
2、第一个Job配置
ATP:Job配置能够用xml文件形式配置,本文暂只用JavaConfig配置
现在,咱们有一个任务就是将一个csv文件的数据导入进入mysql数据库中。
csv文件位置以下图所示
mysql数据库暂时就用上面默认配置的数据库,正常应该会分库处理。
一、建立ImportProvinceJobConfig类
import org.springframework.context.annotation.Configuration;import org.springframework.beans.factory.annotation.Autowired;import lombok.extern.slf4j.Slf4j;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;@Configuration
@EnableBatchProcessing
@Slf4j
public class ImportProvinceJobConfig {@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;}
@Configuration 声明该类是配置类
@EnableBatchProcessing 自动帮你补全一些重要的有关batch工做时的属性依赖,若是不声明,则jobBuilderFactory和stepBuilderFactory会报错
@Slf4j Lombok的日志注解,至关于向ImportProvinceJobConfig 类中添加该属性private static final Logger log = LoggerFactory.getLogger(ImportProvinceJobConfig.class);
JobBuilderFactory 用于建立Job实例
StepBuilderFactory 用于建立Step实例
二、读取
从csv文件中读数据,代码以下
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.core.io.ClassPathResource;/**
* 从文件读入数据
*/
@Bean
public FlatFileItemReader<Province> reader() {
return new FlatFileItemReaderBuilder<Province>()
//定义读取实例的名称
.name("provinceItemReader")
//在严格模式下,ExecutionContext若是输入资源不存在,
//则reader会抛出异常。不然,它会记录问题并继续。
//默认为true
.strict(true)
//源文件每行数据分隔符类型 分隔符为“,”
// .lineTokenizer(new DelimitedLineTokenizer())
//定义解析字符
.encoding(StandardCharsets.UTF_8.name())
//定义资源(读取数据来源)
//Spring资源文档 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#resources
.resource(new ClassPathResource("provinces.csv"))
//文件顶部忽略的行数
.linesToSkip(0)
//linesToSkip设置为2,则调用此handler接口两次
//.skippedLinesCallback(handler)
.delimited()
//定义字段对应
.names(provinceSet)
//转换为对象
.fieldSetMapper(new ProvinceFieldSetMapper())
.build();
}
ATP:Flat File是一种包含没有相对关系结构的记录的文件(作用逗号分隔数值(CSV)的文件)
FlatFileItemReader 该类提供了读取和解析Flat File的基本功能,该类核心 在于Resource
和LineMapper
其中Resource:
Spring Core Resource 相关文档
LineMapper:给定当前行和与之关联的行号,映射器应返回结果域对象
本代码中LineMapper 具体实现 由DelimitedLineTokenizer(默认) 和 ProvinceFieldSetMapper实现
DelimitedLineTokenizer 功能是将csv文件中每行文件以逗号隔开生成String数组
ProvinceFieldSetMapper 是由咱们自行定义,代码以下
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import person.leon.batch.springbatchdemo.entity.Province;
/**
* 从文件读取 转换为对象
*
* @author: Leon
* @date: 2019/6/29 17:24
*/
public class ProvinceFieldSetMapper implements FieldSetMapper<Province> {
@Override
public Province mapFieldSet(FieldSet fieldSet) throws BindException {
Province province = new Province();
province.setId(fieldSet.readLong("id"));
province.setCreateTime(fieldSet.readDate("createTime", "yyyy-MM-dd HH:mm:ss"));
province.setUpdateTime(fieldSet.readDate("updateTime", "yyyy-MM-dd HH:mm:ss"));
province.setProvinceId(fieldSet.readString("provinceId"));
province.setProvinceName(fieldSet.readString("provinceName"));
province.setDisplay(fieldSet.readBoolean("display"));
province.setApkChannel(fieldSet.readString("apkChannel"));
province.setBsChannel(fieldSet.readString("bsChannel"));
return province;
}
}
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Date;
/**
* 省份信息
*
* @author leon
* @date 2019/4/25 11:09
*/
@Data
public class Province {
private Long id;
private Date createTime;
private Date updateTime;
private String provinceId;
private String provinceName;
private boolean display;
private String apkChannel;
private String bsChannel;
}
该类实现接口FieldSetMapper的mapFieldSet方法,其中FieldSet为csv文件中读取出来的字符串数组
该类的主要做用在于将读取出来的字符创转换为对象。
三、转换
将读取数据生成的对象进行转换,由于没有特别的要求,本次直接返回原对象
@Bean
public ProvinceItemProcessor processor() {
return new ProvinceItemProcessor();
}
/**
* 转换
*/
private class ProvinceItemProcessor implements ItemProcessor<Province, Province> {
@Override
public Province process(Province item) throws Exception {
return item;
}
}
四、写入
/**
* 插入语句
*/
private String insertSql = "INSERT INTO province " +
" (id, create_time, update_time, province_id, province_name, display, apk_channel, bs_channel) " +
" VALUES (" +
":id, :createTime, :updateTime, :provinceId, :provinceName, :display, :apkChannel, :bsChannel)";
/**
* 写入数据库
*
* @param dataSource
* @return
*/
@Bean
public JdbcBatchItemWriter<Province> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Province>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql(insertSql)
.dataSource(dataSource)
.build();
}
JdbcBatchItemWriter 使用批处理功能 NamedParameterJdbcTemplate
来执行全部提供的项目的批处理语句
五、构建Job和Step
@Bean
@Qualifier("importProvinceJob")
public Job importProvinceJob(
JobAroundListener listener,
@Qualifier("importProvinceStep") Step step1) {
return jobBuilderFactory.get("importProvinceJob")
// 配置Job不支持再次启动(此时从新启动会抛出JobRestartException异常)
// 默认支持从新启动
//.preventRestart()
// job参数声明验证器
.validator(new DefaultJobParametersValidator())
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
@Qualifier("importProvinceStep")
public Step step1(JdbcBatchItemWriter<Province> writer) {
return stepBuilderFactory.get("importProvinceStep")
.<Province, Province>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
配置Job实例Bean和Step实例Bean,Job跟Step是一对多的关系。
2、运行Job
两种运行方式:
1)启动SpringBoot项目,会自动执行Job;
2)经过http请求调用Job。
为了让项目启动时不执行Job,在application.properties中添加配置
spring.batch.job.enabled=false
在浏览地址栏输入localhost:8080/run
验证执行结果,链接上mysql数据库,查询表province,查看有没有数据,其次查看SpringBatch相关表中有没有数据。