快速使用组件-spring batch(3)读文件数据到数据库

tags: springbatchhtml


1.引言

上一篇文章《快速了解组件-spring batch(2)之helloworld》Spring Batch进行了入门级的开发,也对基本的组件有了必定的了解。但实际开发过程当中,更多的是涉及文件及数据库的操做,以定时后台运行的方式,实现批处理操做。典型操做是从文本数据(csv/txt等文件)中读取数据,而后写入到数据库存储。以下图所示:java

读文件流程

若须要开发此过程,能够按照上一篇文章所写的,自定义ItemReaderItemWriter来实现,可是Spring Batch其实已经提供现成的文件读取和数据库写入的组件,开发人员能够直接使用,提升开发效率。本文将会对文件读取和数据库写入进行实战介绍。mysql

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6

3.Spring Batch提供的读-处理-写组件一览

在使用Spring Batch内置的读写组件时,首先咱们先弄清楚有哪些组件能够用,按读、写、处理,见下面说明。Spring Batch已提供了比较全面的支持。git

3.1 ItemReader

ItemReader 说明
ListItemReader 读取List类型数据,只能读一次
ItemReaderAdapter ItemReader适配器,能够复用现有的读操做
FlatFileItemReader 读Flat类型文件
StaxEventItemReader 读XML类型文件
JdbcCursorItemReader 基于JDBC游标方式读数据库
HibernateCursorItemReader 基于Hibernate游标方式读数据库
StoredProcedureItemReader 基于存储过程读数据库
JpaPagingItemReader 基于Jpa方式分页读数据库
JdbcPagingItemReader 基于JDBC方式分页读数据库
HibernatePagingItemReader 基于Hibernate方式分页读取数据库
JmsItemReader 读取JMS队列
IteratorItemReader 迭代方式的读组件
MultiResourceItemReader 多文件读组件
MongoItemReader 基于分布式文件存储的数据库 MongoDB读组件
Neo4jItemReader 面向网络的数据库Neo4j的读组件
ResourcesItemReader 基于批量资源的读组件,每次读取返回资源对象 AmqpItemReader读取AMQP队列组件
RepositoryItemReader 基于 Spring Data的读组件

3.2 ItemWriter

ItemWriter 说明
FlatFileItemWriter 写Flat类型文件
MultiResourceItemWriter 多文件写组件
StaxEventItemWriter 写XML类型文件
AmqpItemWriter 写AMQP类型消息
ClassifierCompositeItemWriter 根据 Classifier路由不一样的Item到特定的ItemWriter处理
HiberateItemWriter 基于Hibernate方式写数据库
ItemWriterAdapter ItemWriter适配器,能够复用现有的写服务
JdbcBatchItemWriter 基于JDBC方式写数据库
JmsItemWriter 写JMS队列 JpaItemWriter基于Jpa方式写数据库
GemfireItemWriter 基于分布式数据库Gemfire的写组件
SpELMappingGemfireItemWriter 基于Spring表达式语言写分布式数据库Gemfire的写组件
MimeMessageItemWriter 发送邮件的写组件
MongoItemWriter 基于分布式文件存储的数据库MongoDB写组件
Neo4jItemWriter 面向网络的数据库Neo4j的读组件
PropertyExtractingDelegatingItemWriter 属性抽取代理写组件:经过调用给定的 Spring Bean方法执行写入,参数由Item中指定的属性字段获取做为参数
RepositoryItemWriter基于 Spring Data的写组件
SimpleMailMessageItemWriter 发送邮件的写组件
CompositeItemWriter 条目写的组合模式,支持组装多个ItemWriter

3.3 ItemProcessor

ItemProcessor 说明
CompositeItemProcessor 组合处理器,能够封装多个业务处理服务
ItemProcessorAdapter ItemProcessor适配器,能够复用现有的业务处理服务
PassThroughItemProcessor 不作任何业务处理,直接返回读到的数据
ValidatingItemProcessor 数据校验处理器,支持对数据的校验,若是校验不经过能够进行过滤掉或者经过skip的方式跳过对记录的处理

4.开发流程

根据当前示例,从csv文件中读数据,写入到mysql数据库,只须要使用FlatFileItemReaderJdbcBatchItemWriter便可。下面对开发流程做简要说明。示例工程能够在这里获取,里面有文件resources/user-data.csv及相应的目标数据库脚本mytest.sqlgithub

4.1 建立spring batch数据库

4.1.1 建立数据库并执行sql脚本

Spring Batch的运行须要数据库的支持,以保存任务的运行状态及结果。所以须要先建立数据库。在mysql中建立名为my_spring_batch的数据库。并在此数据库中执行 Spring Batch的数据库脚本,脚本位置在spring-batch-core-4.1.2.RELEASE.jar的jar包中的\org\springframework\batch\core\schema-mysql.sql(也能够在示例工程sql文件夹中获取)。执行完成后,数据库表以下图所示:spring

数据库

4.1.2 数据库表说明

数据库共9张表,以seq结尾的是用于生成主键的。其它6张表,以batch_job开头的是存储任务的相关信息,batch_step开头的存储步骤相关信息。sql

  • jobjob instancejob execution关系 任务job是咱们说的逻辑概念,即完整的一个批处理工做,它的实例就是job instance,此任务信息是存储在batch_job_instance表中。有点相似java中的类和类实例的概念,是一对多的关系。对于每个job instance,执行的时候会生成记录存储在batch_job_execution中,表示每个job执行的实际状况。注意,这里job instancejob execution也是一对多的关系,即同一个实例有可能会执行屡次(如上一次执行失败了,后面从新再执行)。数据库

  • batch_job_execution_contextbatch_job_execution_params 存储任务执行时须要用到的上下文(以json格式存储)及运行时使用的参数。json

  • batch_step_executionbatch_step_execution_context 存储任务执行过程当中的做业步骤及运行时上下文。bash

4.1.3 建立示例目标数据库

本示例只涉及一个test_user表。建立mytest数据库库,执行mytest.sql脚本便可。

4.2 配置多数据源

通常来讲,咱们会把Spring Batch的数据存储在独立的数据库中,而实际的应用使用的则是目标数据库,所以须要配置多数据源访问。基于上一篇文章的工程进行开发。

4.2.1 添加mysql数据库依赖

<!-- 数据库相关依赖-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
复制代码

4.2.2 配置多数据源访问

Spring Boot对多数据源的支持比较友好,配置也很简单,先在配置文件中添加数据库配置,而后在java配置文件中添加相应的注解便可。以下:

  • application.properties配置内容
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
复制代码
  • DataSourceConfig配置内容 新建DataSourceConfig.java文件,配置多数据源,以下:
@Configuration
public class DataSourceConfig {
    @Bean("datasource")
    @ConfigurationProperties(prefix="spring.datasource")
    @Primary
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("targetDatasource")
    @ConfigurationProperties(prefix="spring.target-datasource")
    public DataSource targetDatasource() {
        return DataSourceBuilder.create().build();
    }
}
复制代码

这样,后面就能够直接使用datasourcetargetDatasource两个Bean进行数据库访问。

4.3 添加User实体

本实例中,读取csv文件,转为User实体,而后存储到数据库,所以须要先把User这个实体做一个定义。使用了lombokjpa的注解,以下:

@Entity
@Data
@Table(name="test_user")
public class User{
    @Id
    @GeneratedValue
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 手机号
     */
    private String phone;
    ...略
复制代码

4.4 添加文件读取组件ItemReader

使用内置的FlatFileItemReader便可。以下:

@Bean
    public ItemReader file2DbItemReader(){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return new FlatFileItemReaderBuilder<User>()
                .name(funcName)
                .resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
                .delimited()
                .names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
                .fieldSetMapper(new UserFieldSetMapper())
                .build();
    }
复制代码

说明:

  • FlatFileItemReaderBuilder用于建立FlatFileItemReader,设置相应的行为,包括使用它来设置读取文件的位置(resource),文件分隔符(默认是','),是否跳过前面几行(linesToSkip),标识每一列对应的列名称(可与数据库的字段名一致)。设置文件字段与数据库实体字段的对应关系。
  • 设置文件字段与数据库实体字段的对应关系,使用FieldSetMapper来实现,其中FieldSet表明每一行文本数据,返回值即为实体对象。以下所示:
public class UserFieldSetMapper implements FieldSetMapper<User> {
    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        String patternYmd = "yyyy-MM-dd";
        String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setName(fieldSet.readString("name"));
        user.setPhone(fieldSet.readString("phone"));
        user.setTitle(fieldSet.readString("title"));
        user.setEmail(fieldSet.readString("email"));
        user.setGender(fieldSet.readString("gender"));
        //此字段有可能为null
        String dataOfBirthStr = fieldSet.readString("date_of_birth");
        if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
            user.setDateOfBirth(null);
        }else{
            DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
            user.setDateOfBirth(dateTime.toJdkDate());
        }
        user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
        user.setSysCreateUser(fieldSet.readString("sys_create_user"));
        user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
        user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
        return user;
    }
}
复制代码

4.5 添加处理组件ItemProcessor

因为csv文本文件中的数据null值数据标识符为\N,所以能够在处理组件中进行处理,把标识符\N设置为null值。以下所示:

@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {

    @Override
    public User process(User user) throws Exception {
        user.setPhone(checkStr(user.getPhone()));
        user.setTitle(checkStr(user.getTitle()));
        user.setEmail(checkStr(user.getEmail()));
        user.setGender(checkStr(user.getGender()));
        log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
        return user;
    }

    public String checkStr(String dataToCheck){
        if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
            return null;
        }
        return dataToCheck;
    }
}
复制代码

4.6 添加数据库写入组件ItemWriter

数据库写入组件使用JdbcBatchItemWriter便可,以下:

@Bean
    public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
        return new JdbcBatchItemWriterBuilder<User>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
                        "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
                .dataSource(datasource)
                .build();
    }
复制代码

说明:

  • 使用JdbcBatchItemWriterBuilder进行JdbcBatchItemWriter的建立,设置插入数据库的sql语句,同时指定数据源便可。
  • @Qualifier("targetDatasource") DataSource datasource用于指定数据源
  • 使用BeanPropertyItemSqlParameterSourceProvider能够直接把读取的数据实体的属性数据做为参数填充到sql语句中,从而实现数据插入操做。

4.7 组装完整任务

通过上面的操做,可使用一个java配置,把读、写、处理组装成完整的stepjob,以下所示(详细可见示例工程文件):

File2DbBatchConfig.java

@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName)
                .listener(file2DbListener)
                .flow(file2DbStep)
                .end().build();
    }
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor ,ItemWriter file2DbWriter){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName)
                .<User,User>chunk(10)
                .reader(file2DbItemReader)
                .processor(file2DbProcessor)
                .writer(file2DbWriter)
                .build();
    }
复制代码

4.8 编写测试

参考上一文章的ConsoleJobTest,编写File2DbJobTest文件。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {

    @Autowired
    private JobLauncherService jobLauncherService;

    @Autowired
    private Job file2DbJob;

    @Test
    public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //构建任务运行参数
        JobParameters jobParameters = JobUtil.makeJobParameters();
        //执行并显示结果
        Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
        Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
    }
}
复制代码

执行后结果输出以下(exitCode=COMPLETED):

执行结果

5.总结

本文先对Spring Batch的开箱即用的ItemReaderItemWriterItemProcessor做了一个简要的概览,而后以读csv文件,处理null值,再插入到数据库的处理逻辑为案例,介绍了Spring Batch的数据库脚本,FlatFileItemReaderJdbcBatchItemWriter的使用。但愿对你们更深刻的了解Spring Batch有帮助,并能用到实践中。

参考资源

相关文章
相关标签/搜索