根据不一样的支付渠道选择不一样的step进行处理,主要经过实现JobExecutionDecider接口,返回不一样的FlowExecutionStatus来决定step分支。java
其大体实现:mysql
一、maven依赖:redis
<dependencies> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>3.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-samples</artifactId> <version>1.1.4.RELEASE</version> <classifier>sources</classifier> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.5</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> </dependencies>
二、Reader实现spring
/** * Created by heyinbo on 2016/8/17. * 抽象数据读取 */ public abstract class PayBillItemReader<T> implements ItemReader<T>, InitializingBean { /** * 当前数据所处位置 */ private volatile int current = 0; /** * 数据集 */ protected volatile List<T> result; private Object lock = new Object(); public T doReader() { synchronized (lock) { if (null == result) { doDownLoad(); } } int next = current++; if (null != result && next < result.size()) { return result.get(next); } return null; } public abstract void doDownLoad(); @Override public void afterPropertiesSet() throws Exception { } }
/** * Created by heyinbo on 2016/8/17. * 数据读取的具体实现 * */ public class WXPayBillItemReader extends PayBillItemReader<PayBillItem> { private String id; /** * 下载日期 */ private Date loadDate; /** * 支付渠道 */ private String payWay; @Override public void doDownLoad() { //TODO 下载对帐单 } @Override public PayBillItem read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { return doReader(); } public Date getLoadDate() { return loadDate; } public void setLoadDate(Date loadDate) { this.loadDate = loadDate; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPayWay() { return payWay; } public void setPayWay(String payWay) { this.payWay = payWay; } }
二、writer实现sql
public class PayBillItemWriter implements ItemWriter<PayBillItem> { @Override public void write(List items) throws Exception { //TODO write database //测试事务 // throw new DemoException("database rollback"); } }
三、JobExecutionDecider实现apache
/** * Created by heyinbo on 2016/8/17. * 根据JobParameters传入的值决策流程执行分支 */ public class PayBillJobExecutionDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { //根据传入的参数决定step,具体参考配置文件 String payWay = jobExecution.getJobParameters().getString("payWay"); return new FlowExecutionStatus(payWay); } }
五、测试运行app
public class DeciderJobLaunch { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-decider.xml"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("deciderJob"); try { /* 运行Job */ JobExecution result = launcher.run(job, new JobParametersBuilder() .addString("id", "10010") .addString("payWay", "weixin") .addDate("loadDate", new Date()) .toJobParameters()); /* 处理结束,控制台打印处理结果 */ System.out.println(result.getExitStatus().toString()); } catch (Exception e) { e.printStackTrace(); } } }
六、batch相关配置less
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository"/> </bean> <!-- 用于测试,job的相关状态都保存在内存中 --> <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>--> <!-- 默认会建立几张系统表用于保存job的执行状态 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="transactionManager"/> <property name="databaseType" value="mysql" /> </bean> <!-- 数据源 --> <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://192.168.10.1:3306/pay?characterEncoding=UTF8"/> <property name="username" value="root"/> <property name="password" value="root"/> </bean> <!-- 事务管理 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean>
<bean:import resource="spring-application.xml"/> <job id="deciderJob"> <step id="first" next="decision"> <tasklet ref="firstTasklet" /> </step> <decision id="decision" decider="decider"> <next on="weixin" to="wx_pay_step" /> <next on="*" to="end" /> </decision> <step id="wx_pay_step"> <!-- 该step中的transactionManager事务管理 --> <tasklet transaction-manager="transactionManager"> <!-- commit-interval=1000:没1000条记录提交一次到writer中 --> <!-- retry-limit=2:重试次数,遇到retryable-exception-classes对应的异常执行 --> <chunk reader="reader" processor="processer" writer="writer" commit-interval="1000" retry-limit="2"> <retryable-exception-classes> <include class="com.test.batch.simple.DemoException" /> </retryable-exception-classes> </chunk> </tasklet> </step> <step id="end"> <tasklet ref="endTasklet" /> </step> </job> <bean:bean id="decider" class="com.test.batch.simple.decider.PayBillJobExecutionDecider" /> <!-- 设置scope=step:保障jobParameter中值得传递 --> <bean:bean id="reader" class="com.test.batch.simple.decider.WXPayBillItemReader" scope="step"> <bean:property name="id" value="#{jobParameters['id']}" /> <bean:property name="loadDate" value="#{jobParameters['loadDate']}" /> <bean:property name="payWay" value="#{jobParameters['payWay']}" /> </bean:bean> <bean:bean id="processer" class="com.test.batch.simple.decider.WXPayBillItemProcesser" /> <bean:bean id="writer" class="com.test.batch.simple.decider.PayBillItemWriter" /> <bean:bean id="firstTasklet" class="com.test.batch.simple.tasklet.FirstTasklet"> <bean:property name="message" value="hello spring batch" /> </bean:bean> <bean:bean id="endTasklet" class="com.test.batch.simple.tasklet.EndTasklet"> <bean:property name="message" value="bye spring batch" /> </bean:bean>
以上就是整个job的处理流程,其中包括异常处理以及事务管理。maven
备注:优化对帐方式,将数据初始化到redis服务中,经过redis的sdiff命令进行比对筛选差集。目前采用redis是基于其扩展方便,利于后期针对不一样渠道进行扩展。ide