spring bacth

从头认识SpringBatch批处理框架--实例场景一信用卡消费对帐

场景说明html

我的使用信用卡消费,银行按期发送银行卡消费帐单,本例将模拟银行处理我的信用卡消费对帐单对帐,银行须要按期地把我的消费的记录导出成csv文件,而后交给对帐系统处理。java

主要流程spring

(从credit-card-bill-201303.csv)读取数据---->处理数据----->写数据到 outputFile文件数据库

项目结构app

项目结构说明:框架

 

  • CreditBill:信用卡消费记录领域对象
  • CreditBillProcessor:信用卡消费记录处理类
  • jobLaunch:调用批处理做业类
  • jobLaunchTest:Junit单元测试,使用Spring提供的测试框架
  • credit-card-bill-201303.csv:信用卡消费帐单文件
  • job-context.xml:定义做业基础信息
  • job.xml:定义做业文件
  • outputFile.xml:输出处理事后的信用卡消费帐单文件
项目实现步骤详解:
1.准备credit-card-bill-201303.csv对帐文件
这里咱们使用csv格式的文件,该文件的每一行表示信用卡消费记录,中间用逗号分隔,分别表示:
银行卡帐户、帐户名、消费金额、消费日期、消费场所以下图所示:
 
2.定义领域对象:
为了与帐单文件造成映射,须要新建信用卡消费记录对象 -CreditBill,主要属性:银行卡帐户、帐户名、消费金额、消费日期、消费场所
具体代码以下:
[java]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. package com.my.domain;  
  2. /** 
  3.  * 信用卡消费记录领域对象 
  4.  * 该类主要用于在ItemReader读取文件数据以后转换成领域对象CreditBill, 
  5.  * 以便于ItemProcessoe和ItemWriter操做使用 
  6.  * @author wbw 
  7.  * 
  8.  */  
  9. public class CreditBill {  
  10.     /** 
  11.      * 银行帐户 
  12.      */  
  13.     private String accountID;  
  14.     /** 
  15.      * 帐户名 
  16.      */  
  17.     private String name;  
  18.     /** 
  19.      * 消费金额 
  20.      */  
  21.     private double amount;  
  22.     /** 
  23.      * 消费日期 
  24.      */  
  25.     private String date;  
  26.     /** 
  27.      * 消费场所 
  28.      */  
  29.     private String address;  
  30.     /** 
  31.      * @return the 银行帐户 
  32.      */  
  33.     public String getAccountID() {  
  34.         return accountID;  
  35.     }  
  36.     /** 
  37.      * @param 银行帐户 the accountID to set 
  38.      */  
  39.     public void setAccountID(String accountID) {  
  40.         this.accountID = accountID;  
  41.     }  
  42.     /** 
  43.      * @return the 帐户名 
  44.      */  
  45.     public String getName() {  
  46.         return name;  
  47.     }  
  48.     /** 
  49.      * @param 帐户名 the name to set 
  50.      */  
  51.     public void setName(String name) {  
  52.         this.name = name;  
  53.     }  
  54.     /** 
  55.      * @return the 消费金额 
  56.      */  
  57.     public double getAmount() {  
  58.         return amount;  
  59.     }  
  60.     /** 
  61.      * @param 消费金额 the amount to set 
  62.      */  
  63.     public void setAmount(double amount) {  
  64.         this.amount = amount;  
  65.     }  
  66.     /** 
  67.      * @return the 消费日期 
  68.      */  
  69.     public String getDate() {  
  70.         return date;  
  71.     }  
  72.     /** 
  73.      * @param 消费日期 the date to set 
  74.      */  
  75.     public void setDate(String date) {  
  76.         this.date = date;  
  77.     }  
  78.     /** 
  79.      * @return the 消费场所 
  80.      */  
  81.     public String getAddress() {  
  82.         return address;  
  83.     }  
  84.     /** 
  85.      * @param 消费场所 the address to set 
  86.      */  
  87.     public void setAddress(String address) {  
  88.         this.address = address;  
  89.     }  
  90.   
  91.       
  92. }  
3.定义job-context.xml批处理基础信息
该配置文件主要是配置做业仓库、做业调度器、事务管理器,具体代码以下:
[html]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"      
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.     xmlns:p="http://www.springframework.org/schema/p"      
  5.     xmlns:tx="http://www.springframework.org/schema/tx"   
  6.     xmlns:aop="http://www.springframework.org/schema/aop"      
  7.     xmlns:context="http://www.springframework.org/schema/context"      
  8.     xsi:schemaLocation="http://www.springframework.org/schema/beans    
  9.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    
  10.     http://www.springframework.org/schema/tx    
  11.     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd    
  12.     http://www.springframework.org/schema/aop    
  13.     http://www.springframework.org/schema/aop/spring-aop-3.0.xsd    
  14.     http://www.springframework.org/schema/context    
  15.     http://www.springframework.org/schema/context/spring-context-2.5.xsd"      
  16.     default-autowire="byName">  
  17.     <!--定义做业仓库SpringBatch提供了两种做业仓库来记录job执行期产生的信息:一种是内存,另外一种是数据库    -->  
  18.     <bean id="jobRepository"   
  19.         class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
  20.     </bean>  
  21.     <!--定义做业调度器,用来启动Job -->  
  22.     <bean id="jobLauncher"   
  23.         class="org.springframework.batch.core.launch.support.SimpleJobLauncher">  
  24.         <property name="jobRepository" ref="jobRepository"/>  
  25.     </bean>  
  26.     <!--事务管理器,用于springbatch框架在对数据操做过程当中体统事务能力-->  
  27.     <bean id="transactionManager"   
  28.         class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>  
  29. </beans>  
4.定义job.xml文件
job.xml文件主要配置批处理做业Job、Step、ItemReader(读数据)、ItemProcessoe(处理数据)、 ItemWriter(写数据) 具体的配置以下图:
[html]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <bean:beans xmlns="http://www.springframework.org/schema/batch"      
  3.     xmlns:bean="http://www.springframework.org/schema/beans"   
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      
  5.     xmlns:p="http://www.springframework.org/schema/p"   
  6.     xmlns:tx="http://www.springframework.org/schema/tx"      
  7.     xmlns:aop="http://www.springframework.org/schema/aop"   
  8.     xmlns:context="http://www.springframework.org/schema/context"      
  9.     xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    
  11.     http://www.springframework.org/schema/tx   
  12.     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd    
  13.     http://www.springframework.org/schema/aop   
  14.     http://www.springframework.org/schema/aop/spring-aop-3.0.xsd    
  15.     http://www.springframework.org/schema/context   
  16.     http://www.springframework.org/schema/context/spring-context-2.5.xsd  
  17.     http://www.springframework.org/schema/batch   
  18.     http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">  
  19.     <!--引入job-context.xml配置文件-->  
  20.     <bean:import resource="classpath:job-context.xml"/>  
  21.     <!--定义billJob billStep 包含读数据  处理数据 写数据-->  
  22.     <job id="billJob">  
  23.         <step id="billStep">  
  24.             <tasklet transaction-manager="transactionManager">  
  25.                 <!--commit-interval="2" 表示任务提交间隔的大小 此处表示每处理2条数据 进行一次写入操做-->  
  26.                 <chunk reader="csvItemReader" writer="csvItemWriter"   
  27.                     processor="creditBillProcessor" commit-interval="2">  
  28.                 </chunk>  
  29.             </tasklet>  
  30.         </step>  
  31.     </job>  
  32.     <!-- 读取信用卡帐单文件,CSV格式 -->  
  33.     <bean:bean id="csvItemReader"  
  34.         class="org.springframework.batch.item.file.FlatFileItemReader"   
  35.         scope="step">  
  36.         <!--设置读取的文件资源-->  
  37.         <bean:property name="resource"   
  38.             value="classpath:credit-card-bill-201303.csv"/>  
  39.          <!--将文本中的每行记录转换为领域对象-->  
  40.         <bean:property name="lineMapper">  
  41.             <bean:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">  
  42.                <!--引用lineTokenizer-->  
  43.                <bean:property name="lineTokenizer" ref="lineTokenizer"/>  
  44.                 <!--fieldSetMapper根据lineTokenizer中定义的names属性映射到领域对象中去-->  
  45.                 <bean:property name="fieldSetMapper">  
  46.                     <bean:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">  
  47.                         <bean:property name="prototypeBeanName" value="creditBill">  
  48.                         </bean:property>  
  49.                     </bean:bean>  
  50.                 </bean:property>  
  51.             </bean:bean>  
  52.         </bean:property>  
  53.     </bean:bean>  
  54.     <!-- lineTokenizer 定义文本中每行的分隔符号 以及每行映射成FieldSet对象后的name列表 -->  
  55.     <bean:bean id="lineTokenizer"   
  56.         class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">  
  57.         <bean:property name="delimiter" value=","/>  
  58.         <bean:property name="names">  
  59.             <bean:list>  
  60.                 <bean:value>accountID</bean:value>  
  61.                 <bean:value>name</bean:value>  
  62.                 <bean:value>amount</bean:value>  
  63.                 <bean:value>date</bean:value>  
  64.                 <bean:value>address</bean:value>  
  65.             </bean:list>  
  66.         </bean:property>  
  67.     </bean:bean>  
  68.       
  69.     <!-- 写信用卡帐单文件,CSV格式 -->  
  70.     <bean:bean id="csvItemWriter"   
  71.         class="org.springframework.batch.item.file.FlatFileItemWriter"   
  72.         scope="step">  
  73.         <bean:property name="resource" value="file:outputFile.csv"/>  
  74.         <bean:property name="lineAggregator">  
  75.             <bean:bean   
  76.                 class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">  
  77.                 <bean:property name="delimiter" value=","></bean:property>  
  78.                 <bean:property name="fieldExtractor">  
  79.                     <bean:bean   
  80.                         class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">  
  81.                         <bean:property name="names" value="accountID,name,amount,date,address">  
  82.                         </bean:property>  
  83.                     </bean:bean>  
  84.                 </bean:property>  
  85.             </bean:bean>  
  86.         </bean:property>  
  87.     </bean:bean>  
  88.     <!--领域对象 并标注为原型-->  
  89.     <bean:bean id="creditBill" scope="prototype"  
  90.         class="com.my.domain.CreditBill">  
  91.     </bean:bean>  
  92.     <!--负责业务数据的处理-->  
  93.     <bean:bean id="creditBillProcessor" scope="step"  
  94.         class="com.my.processor.CreditBillProcessor">  
  95.     </bean:bean>  
  96. </bean:beans>  
5.建立ItemProcessor
该类主要是用于处理ItemReader读取的数据,一般包括数据过滤、数据转换等操做,此处咱们只是简单的打印帐单信息。具体代码以下:
[java]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. /** 
  2.  *  
  3.  */  
  4. package com.my.processor;  
  5.   
  6. import org.springframework.batch.item.ItemProcessor;  
  7.   
  8. import com.my.domain.CreditBill;  
  9.   
  10. /** 
  11.  * 处理数据器 
  12.  * @author wbw 
  13.  * 
  14.  */  
  15. public class CreditBillProcessor implements ItemProcessor<CreditBill, CreditBill> {  
  16.   
  17.     public CreditBill process(CreditBill bill) throws Exception {  
  18.         System.out.println("信用卡消费领域对象:"+bill.getAccountID()+"-"+bill.getName()+"-"+bill.getAmount()+"-"+bill.getAddress());  
  19.         return bill;  
  20.     }  
  21. }  
6.启动Job
咱们这里介绍两种启动运行方式,
第一种:main方法
[java]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. /** 
  2.  *  
  3.  */  
  4. package com.my.batch;  
  5.   
  6. import org.springframework.batch.core.Job;  
  7. import org.springframework.batch.core.JobExecution;  
  8. import org.springframework.batch.core.JobParameters;  
  9. import org.springframework.batch.core.launch.JobLauncher;  
  10. import org.springframework.context.ApplicationContext;  
  11. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  12.   
  13. /** 
  14.  * 测试 
  15.  * @author wbw 
  16.  * 
  17.  */  
  18. public class JobLaunch {  
  19.       
  20.     /** 
  21.      * @param args 
  22.      */  
  23.     @SuppressWarnings("resource")  
  24.     public static void main(String[] args) {  
  25.         //初始化应用上下文  
  26.         ApplicationContext context = new ClassPathXmlApplicationContext("job.xml");  
  27.         //获取做业调度,根据Bean的名称从Spring的上下文获取  
  28.         JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");  
  29.         //获取任务对象  
  30.         Job job = (Job) context.getBean("billJob");  
  31.         try {  
  32.             JobExecution result = launcher.run(job, new JobParameters());  
  33.             System.out.println(result.toString());  
  34.         } catch (Exception e) {  
  35.             e.printStackTrace();  
  36.         }  
  37.     }  
  38. }  
第二种:Junit单元测试

[java]  view plain  copy
 
 在CODE上查看代码片派生到个人代码片
  1. package com.my.batch;  
  2.   
  3.   
  4. import org.junit.After;  
  5. import org.junit.Before;  
  6. import org.junit.Test;  
  7. import org.junit.runner.RunWith;  
  8. import org.springframework.batch.core.Job;  
  9. import org.springframework.batch.core.JobExecution;  
  10. import org.springframework.batch.core.JobParameters;  
  11. import org.springframework.batch.core.launch.JobLauncher;  
  12. import org.springframework.beans.factory.annotation.Autowired;  
  13. import org.springframework.beans.factory.annotation.Qualifier;  
  14.   
  15. @RunWith(SpringJUnit4ClassRunner.class)  
  16. @ContextConfiguration(locations={"job.xml"})  
  17. public class JobLaunchTest {  
  18.     @Autowired  
  19.     private JobLauncher jobLauncher;  
  20.       
  21.     @Autowired@Qualifier("billJob")  
  22.     private Job job;  
  23.   
  24.     @Before  
  25.     public void setUp() throws Exception {  
  26.     }  
  27.   
  28.     @After  
  29.     public void tearDown() throws Exception {  
  30.     }  
  31.       
  32.     @Test  
  33.     public void billJob() throws Exception {  
  34.         JobExecution result = jobLauncher.run(job, new JobParameters());            
  35.         System.out.println(result.toString());       
  36.     }  
  37. }  
执行Job后 outputFile文件已录入数据


控制台信息:
这里只显示了两条信用卡消费帐单信息,由于咱们在job.xml文件中设置任务提交间隔的大小 每处理2条数据 进行一次写入操做。以下图
 
总结:
从这个实例场景中,咱们学会了使用SpringBatch已经提供好的功能组件和基础设施,快速搭建批处理应用。了解了SpringBatch的一些基本概念:
JobRepository:做业仓库,负责Job、Step执行过程当中的状态保存;
JobLauncher:做业调度器,提供执行Job的入口;
Job:做业,由一个或多个的Step组成;
Step:做业步,Job的一个执行环节,由一个或多个的Step组成Job
Tasklet:Stp中具体执行逻辑的操做,能够重复执行,能够设置具体的同步、异步操做
Chunk:给定数量的Item集合
Item:一条数据记录
ItenReader:从数据源(文件、数据库或消息队列等)中获取Item
ItemProcessor:在Item写数据以前,对数据进行数据过滤、数据清洗、数据转换、数据校验等操做
ItemWriter:将Item数据记录批量写入数据源(文件、数据库或消息队列等)
相关文章
相关标签/搜索