SpringBatch企业批处理框架Reader的使用

SpringBatch是解决企业数据逻辑较简单,重复性高,大数据量而设计的.从他提供的各类Reader就能看出来.起码我是这样理解的.最适合作的如:数据清洗,数据分析后转移,或者定时的和其余系统交互的地方等. java

在上一篇文章中,我使用了 JdbcPagingItemReader读取HSQLDB数据库的数据. spring

<bean id="sysAppStoreMapper" class="net.dbatch.mapper.SysAppStoreMapper" />

<bean id="dbReader"
          class="org.springframework.batch.item.database.JdbcPagingItemReader">
        <property name="dataSource" ref="dataSource"/>
        <property name="rowMapper" ref="sysAppStoreMapper"/>
        <property name="queryProvider" ref="appQueryProvider"/>
    </bean>


    <bean id="appQueryProvider"
          class="org.springframework.batch.item.database.support.HsqlPagingQueryProvider">
        <property name="selectClause" value="a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ"/>
        <property name="fromClause" value="sys_appstore a"/>
        <property name="sortKey" value="SEQ"/>
    </bean>

事实上SpringBatch提供了不少的Reader,自定义的Reader只要是继承自org.springframework.batch.item.ItemReader接口的均可以.可是好多都不用你麻烦了,SpringBatch都替你作好了.2.1.8API中基本经常使用的和数据库[Hibernate/Ibatis/JDBC],文件系统,JMS消息等Reader现成的实现.如图: sql



对于喜欢SpringJDBC的用户[我就很是不喜欢Hibernate ]可使用JdbcPagingItemReader shell

,而后指定一个queryProvider ,queryProvider 是针对各类数据库的一个分页的实现,经常使用的数据库 queryProvider也有现成的.如图: 数据库



好了.若是上面你实在找不到你可使用的数据库对应的实现,而你又了解你的数据库SQL,你可使用JdbcCursorItemReader.这个Reader容许你本身set SQL. app

如我上面实现的例子,JdbcCursorItemReader改写也很是简单: ide

<bean id="dbReader"
          class="org.springframework.batch.item.database.JdbcCursorItemReader">
        <property name="dataSource" ref="dataSource" />
        <property name="sql" value="select a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER from sys_appstore a order by a.SEQ" />
        <property name="rowMapper" ref="sysAppStoreMapper" />
    </bean>

他仍然能够工做的很好,并且还简单了. 测试

若是个人数据来源不是从数据库,从文件的怎么办? 大数据

看到刚才的Reader实现里有个FlatFileItemReader?他就是读取文件[文本文件]. ui

假如我要分析这样结构的log日志信息

User1,20
User2,21
User3,22
User4,23
User5,24
User6,25
User7,26
User8,27
User9,28
User10,29

他都是一些结构化的文本文件,我能够很容易的实现.Spring代码:

<bean id="delimitedLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />

    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer" ref="delimitedLineTokenizer" />
		<property name="fieldSetMapper">
			<bean class="net.dbatch.sample.UserMapper" />
		</property>
	</bean>

	<bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="lineMapper" ref="lineMapper" />
		<property name="resource" value="classpath:/users.txt" />
	</bean>

再写上一个对应的Bean

public class UserMapper implements FieldSetMapper<User> {
	
	
	public User mapFieldSet(FieldSet fs) throws BindException {
		User u = new User();
		u.setName(fs.readString(0));
		u.setAge(fs.readInt(1));
		return u;
	}
}

Processor:

public class MessagesItemProcessor implements ItemProcessor<User, Message> {

	public Message process(User user) throws Exception {
		if(!StringUtils.hasText(user.getName())){
			throw new RuntimeException("The user name is required!");
		}
		Message m = new Message();//Message是user一个简单的包装
		m.setUser(user);
		m.setContent("Hello " + user.getName()
				+ ",please pay promptly at end of this month.");
		return m;
	}
}

Writer:

public class MessagesItemWriter implements ItemWriter<Message> {

    public void write(List<? extends Message> messages) throws Exception {
        System.out.println("write results");
        for (Message m : messages) {
            System.out.println(m.getContent());  //只作输出
        }
    }
}

测试代码:

public static void main(String[] args) {
		ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("localfile_job.xml");
		SimpleJobLauncher launcher = new SimpleJobLauncher();
		launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));
		launcher.setTaskExecutor(new SyncTaskExecutor());
		try {
			JobExecution je = launcher.run((Job) c.getBean("messageJob"),
                    new JobParametersBuilder().toJobParameters());
			System.out.println(je);
			System.out.println(je.getJobInstance());
			System.out.println(je.getStepExecutions());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

输出:

10-20 15:28:32 INFO [job.SimpleStepHandler] - <Executing step: [messageStep]>
write results
Hello User1,please pay promptly at end of this month.
Hello User2,please pay promptly at end of this month.
Hello User3,please pay promptly at end of this month.
Hello User4,please pay promptly at end of this month.
Hello User5,please pay promptly at end of this month.
write results
Hello User6,please pay promptly at end of this month.
Hello User7,please pay promptly at end of this month.
Hello User8,please pay promptly at end of this month.
Hello User9,please pay promptly at end of this month.
Hello User10,please pay promptly at end of this month.
10-20 15:28:32 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=messageJob]] completed with the following parameters: [{run.month=2011-10}] and the following status: [COMPLETED]>
JobExecution: id=0, version=2, startTime=Sat Oct 20 15:28:32 CST 2012, endTime=Sat Oct 20 15:28:32 CST 2012, lastUpdated=Sat Oct 20 15:28:32 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]]
JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]
[StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=3, rollbackCount=0, exitDescription=]

从日志里,咱们能够清楚的看到.他是每行的读取并送入Processor中处理,完成5次读取进行一次性的写入.tasklet的属性 commit-interval能够调节此值.

所有的Spring配置:

<batch:job id="messageJob" restartable="true">
		<batch:step id="messageStep">
			<batch:tasklet>
				<batch:chunk reader="messageReader" 
				processor="messageProcessor" 
				writer="messageWriter"
			    commit-interval="5" 
			    chunk-completion-policy="" 
			    retry-limit="2">
					<batch:retryable-exception-classes>
						<batch:include class="java.lang.RuntimeException" />
					</batch:retryable-exception-classes>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>
相关文章
相关标签/搜索