Spring Batch_Intercepting Step Execution_配置SkipListenerjava
关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244mysql
先看一下StepListener.java 接口的继承关系图:spring
StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failedsql
ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:apache
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.tomcat
SkipListeners and Transactionsapp
One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:ide
The appropriate skip method (depending on when the error happened) will only be called once per item.测试
The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.fetch
A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:
public interface ChunkListener extends StepListener { void beforeChunk(); void afterChunk(); }
The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).
上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:
咱们选择一个SkipListenerSupport ,经过继承 它实现咱们自定义逻辑的 SkipListener。以下:
MySkipListener.java
package com.lyx.batch3; import org.springframework.batch.core.listener.SkipListenerSupport; import com.lyx.batch.People; import com.lyx.batch.PeopleDESC; public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> { @Override public void onSkipInRead(Throwable t) { // TODO Auto-generated method stub super.onSkipInRead(t); System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>"); } @Override public void onSkipInWrite(PeopleDESC item, Throwable t) { // TODO Auto-generated method stub super.onSkipInWrite(item, t); System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>"); System.out.println(">>>>=" + item.toString()); } /** * 当processor抛出 skip include包含的异常时 */ @Override public void onSkipInProcess(People item, Throwable t) { // TODO Auto-generated method stub super.onSkipInProcess(item, t); System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>"); System.out.println(">>>>=" + item.toString()); } }
如下是配置文件:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的扫描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractCursorReader" abstract="true" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor" writer="addDescPeopleWriter" commit-interval="2" skip-limit="40"> <batch:skippable-exception-classes> <!--batch:include配置容许发生的异常 --> <batch:include class="com.lyx.batch.InvalidDataException" /> </batch:skippable-exception-classes> <batch:listeners> <!-- 这里能够配置多个listener --> <batch:listener ref="sampleSkipListener" /> </batch:listeners> </batch:chunk> </batch:tasklet> </batch:step> <!-- 在job的运行期间,能够监视job --> <batch:listeners> <batch:listener ref="sampleListener" /> </batch:listeners> </batch:job> <!-- add people desc job end --> <bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" /> <bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" /> <bean id="peopleAddDescReader" parent="abstractCursorReader" scope="step"> <property name="sql"> <value><![CDATA[select first_name ,last_name from people where first_name like ? or last_name like ?]]></value> </property> <property name="rowMapper" ref="peopleRowMapper" /> <property name="preparedStatementSetter" ref="preparedStatementSetter" /> <property name="fetchSize" value="20" /> </bean> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" /> <bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" /> <bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /> <!--tomcat jdbc pool数据源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事务管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
主要的配置为:
<!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor" writer="addDescPeopleWriter" commit-interval="2" skip-limit="40"> <batch:skippable-exception-classes> <!--batch:include配置容许发生的异常 --> <batch:include class="com.lyx.batch.InvalidDataException" /> </batch:skippable-exception-classes> <batch:listeners> <!-- 这里能够配置多个listener --> <batch:listener ref="sampleSkipListener" /> </batch:listeners> </batch:chunk> </batch:tasklet> </batch:step> <!-- 在job的运行期间,能够监视job --> <batch:listeners> <batch:listener ref="sampleListener" /> </batch:listeners> </batch:job> <!-- add people desc job end --> <bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" /> <bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />
运行:
AppMain12.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 测试step listener skip listener * * @author Lenovo * */ public class AppMain12 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 获取开始时间 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-exception-listener.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任务正常完成"); } else { System.out.println("任务失败,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 获取结束时间 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); } }
运行结果:
.........................................................................
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
job success.........
十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
任务正常完成
程序运行时间: 8779ms
总结:经过skip listener能够看到当skip发生时,能够经过listener捕捉到该事件的发生,从而把skip的数据记录下来,以便作进一步的处理。
====================END====================