Spring Batch JobExecutionDecider

    根据不一样的支付渠道选择不一样的step进行处理,主要经过实现JobExecutionDecider接口,返回不一样的FlowExecutionStatus来决定step分支。java

    其大体实现:mysql

    一、maven依赖:redis

<dependencies>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>3.0.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-samples</artifactId>
        <version>1.1.4.RELEASE</version>
        <classifier>sources</classifier>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.5</version>
    </dependency>
    <dependency>
        <groupId>commons-dbcp</groupId>
        <artifactId>commons-dbcp</artifactId>
        <version>1.4</version>
    </dependency>
</dependencies>

    二、Reader实现spring

/**
 * Created by heyinbo on 2016/8/17.
 * 抽象数据读取
 */
public abstract class PayBillItemReader<T> implements ItemReader<T>, InitializingBean {
    /**
     * 当前数据所处位置
     */
    private volatile int current = 0;
    /**
     * 数据集
     */
    protected volatile List<T> result;

    private Object lock = new Object();

    public T doReader() {
        synchronized (lock) {
            if (null == result) {
                doDownLoad();
            }
        }
        int next = current++;
        if (null != result && next < result.size()) {
            return result.get(next);
        }
        return null;
    }

    public abstract void doDownLoad();

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}
/**
 * Created by heyinbo on 2016/8/17.
 * 数据读取的具体实现
 *
 */
public class WXPayBillItemReader extends PayBillItemReader<PayBillItem> {

    private String id;
    /**
     * 下载日期
     */
    private Date loadDate;
    /**
     * 支付渠道
     */
    private String payWay;

    @Override
    public void doDownLoad() {
        //TODO 下载对帐单
    }

    @Override
    public PayBillItem read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return doReader();
    }

    public Date getLoadDate() {
        return loadDate;
    }

    public void setLoadDate(Date loadDate) {
        this.loadDate = loadDate;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPayWay() {
        return payWay;
    }

    public void setPayWay(String payWay) {
        this.payWay = payWay;
    }
}

    二、writer实现sql

public class PayBillItemWriter implements ItemWriter<PayBillItem> {

    @Override
    public void write(List items) throws Exception {
        //TODO write database

        //测试事务
//        throw new DemoException("database rollback");
    }
}

    三、JobExecutionDecider实现apache

/**
 * Created by heyinbo on 2016/8/17.
 * 根据JobParameters传入的值决策流程执行分支
 */
public class PayBillJobExecutionDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
     //根据传入的参数决定step,具体参考配置文件
        String payWay = jobExecution.getJobParameters().getString("payWay");
        return new FlowExecutionStatus(payWay);
    }
}  

    五、测试运行app

public class DeciderJobLaunch {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-decider.xml");
        JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("deciderJob");
        try {
            /* 运行Job */
            JobExecution result = launcher.run(job, new JobParametersBuilder()
                    .addString("id", "10010")
                    .addString("payWay", "weixin")
                    .addDate("loadDate", new Date())
                    .toJobParameters());
            /* 处理结束,控制台打印处理结果 */
            System.out.println(result.getExitStatus().toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}  

    六、batch相关配置less

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
   <property name="jobRepository" ref="jobRepository"/>
</bean>

<!-- 用于测试,job的相关状态都保存在内存中 -->
<!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
</bean>

<bean id="transactionManager"
     class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>-->

<!-- 默认会建立几张系统表用于保存job的执行状态 -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
   <property name="dataSource" ref="dataSource" />
   <property name="transactionManager" ref="transactionManager"/>
   <property name="databaseType" value="mysql" />
</bean>
<!-- 数据源 -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
     destroy-method="close">
   <property name="driverClassName" value="com.mysql.jdbc.Driver" />
   <property name="url" value="jdbc:mysql://192.168.10.1:3306/pay?characterEncoding=UTF8"/>
   <property name="username" value="root"/>
   <property name="password" value="root"/>
</bean>

<!-- 事务管理 -->
<bean id="transactionManager"
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
   <property name="dataSource" ref="dataSource" />
</bean>
<bean:import resource="spring-application.xml"/>

<job id="deciderJob">
   <step id="first" next="decision">
      <tasklet ref="firstTasklet" />
   </step>
   <decision id="decision" decider="decider">
      <next on="weixin" to="wx_pay_step" />
      <next on="*" to="end" />
   </decision>
   <step id="wx_pay_step">
      <!-- 该step中的transactionManager事务管理 -->
      <tasklet transaction-manager="transactionManager">
         <!-- commit-interval=1000:没1000条记录提交一次到writer中 -->
         <!-- retry-limit=2:重试次数,遇到retryable-exception-classes对应的异常执行 -->
         <chunk reader="reader" processor="processer" writer="writer" commit-interval="1000" retry-limit="2">
            <retryable-exception-classes>
               <include class="com.test.batch.simple.DemoException" />
            </retryable-exception-classes>
         </chunk>
      </tasklet>
   </step>
   <step id="end">
      <tasklet ref="endTasklet" />
   </step>
</job>

<bean:bean id="decider" class="com.test.batch.simple.decider.PayBillJobExecutionDecider" />

<!-- 设置scope=step:保障jobParameter中值得传递 -->
<bean:bean id="reader" class="com.test.batch.simple.decider.WXPayBillItemReader" scope="step">
   <bean:property name="id" value="#{jobParameters['id']}" />
   <bean:property name="loadDate" value="#{jobParameters['loadDate']}" />
   <bean:property name="payWay" value="#{jobParameters['payWay']}" />
</bean:bean>

<bean:bean id="processer" class="com.test.batch.simple.decider.WXPayBillItemProcesser" />

<bean:bean id="writer" class="com.test.batch.simple.decider.PayBillItemWriter" />

<bean:bean id="firstTasklet" class="com.test.batch.simple.tasklet.FirstTasklet">
   <bean:property name="message" value="hello spring batch" />
</bean:bean>

<bean:bean id="endTasklet" class="com.test.batch.simple.tasklet.EndTasklet">
   <bean:property name="message" value="bye spring batch" />
</bean:bean>

    以上就是整个job的处理流程,其中包括异常处理以及事务管理。maven

备注:优化对帐方式,将数据初始化到redis服务中,经过redis的sdiff命令进行比对筛选差集。目前采用redis是基于其扩展方便,利于后期针对不一样渠道进行扩展。ide

相关文章
相关标签/搜索