批处理框架Spring Batch+spring boot+quartz

最近项目须要用到批处理,而后在网上搜了一下,搜到一篇蛮不错的文章,转载并和你们分享一下。html

转载:https://blog.csdn.net/william_jm/article/details/78964538java

1简介 1.1概述 大数据时代,数据的收集、处理、存储、分析、挖掘、检索、展现,环环相扣。其中数据处理环节是一个典型的批处理场景——按期对海量数据进行格式化,各类业务规范校验,复杂的业务逻辑处理,并经过事务的方式处理到本身的数据库中,同时还应该具有高效率,无人工干预能力。 Spring Batch的出现,很好的应对了该类需求。Spring Batch是一个轻量级的综合性批处理框架,能够应用于企业级大数据量处理系统。SpringBatch能够提供大量的,可重复的数据处理功能,包括日志/跟踪(tracing),事务管理,任务处理(processing)统计,任务重启,忽略(skip),和资源管理等功能。此外还提供了许多高级服务和特性,使之可以经过优化(optimization)和分片技术(partitioning techniques)来高效地执行超大型数据集的批处理任务。须要注意的是,Spring Batch并不提供定时之类的功能,那是quartz,Tivoli,Control-M等调度框架作的事情,它们是协做关系,而不是取代。 1.2背景 在微服务架构讨论的如火如荼之际,基于Java的批处理框架却无人问津。即便企业中一直都有批处理的需求,但因缺少一个标准的、可重用的批处理框架,导致项目/产品中出现大量一次编写,一次使用的代码片断,以及不少其余不一样的临时解决方案。 SpringSource和Accenture(埃森哲)联手协做,致力于改善这种情况。埃森哲在实现批处理架构上有着丰富的产业实践经验,SpringSource有深刻的技术开发积累,背靠Spring框架提供的编程模型,强强联合,势必创造出高质量的、市场承认的企业级java解决方案——SpringBatch,基于埃森哲数十年宝贵的经验并基于最新的软件平台(如COBOL/Mainframe,C++/Unix 及如今很是流行的Java平台)来构建的项目。Spring Batch将来将会由开源社区提交者来驱动项目的开发、加强、以及将来的路线图。而埃森哲咨询公司与SpringSource合做的目标是促进软件处理方法、框架和工具的标准化改进。 1.3场景 典型的批处理流程是读数据、处理数据、写数据的三步式架构——从数据库、文件或队列中读取大量数据,而后经过业务规则处理数据,最后将处理完的数据按需求方式写(数据库、文件等)。一般Spring Batch工做在离线模式下,不须要用户干预、就能自动进行基本的批处理迭代,进行相似事务方式的处理。 1.3.1 适用业务 Ø 按期提交批处理任务(日终处理) Ø 并发批处理:并行执行任务 Ø 分阶段,企业消息驱动处理 Ø 高并发批处理任务 Ø 失败后手动或定时重启 Ø 按顺序处理依赖任务 (使用工做流驱动的批处理插件) Ø 局部处理:跳过记录(例如在回滚时) Ø 完整的批处理事务:由于可能有小数据量的批处理或存在存储过程/脚本 1.3.2核心能力 Ø 利用Spring编程模式:使开发者专一于业务逻辑,让框架解决基础功能 Ø 明确划分在批处理基础架构、执行环境、应用 Ø 通用的核心服务以接口形式提供 Ø 提供简单的默认实现,以实现核心执行接口的“开箱即用” Ø 易于配置、定制和扩展服务 Ø 核心服务很容易扩展与替换,且不会影响基础层 Ø 简单部署模型 2关键架构与领域术语 2.1层次架构 Spring Batch的架构设计是充分考虑了系统的可扩展性和各种终端开发的普适性。下图2.1.1是Spring Batch的层次架构示意图。mysql

图2.1.1-SpringBatch层次架构图 Spring Batch架构主要分为三类高级组件: 应用层(Application), 核心层(Core) 和基础架构层(Infrastructure)。 应用层(Application):指开发人员编写的全部批处理业务做业和自定义代码。 核心层(Core):指加载和控制批处理做业所必需的核心类。含JobLauncher,Job和 Step的实现。 基础架构层(Infrastructure):应用层与核心层都构建在基础架构层之上。基础架构包括通用的readers(ItemReader)和writers(ItemWriter),以及services (如重试模块 RetryTemplate),能够被应用层和核心层所使用。 2.2领域术语 Step:表示做业Job中的一个完整业务逻辑步骤,一个Job能够有一个或者多个Step组成。 StepExecution:表示试运行一个步骤step的句柄。只有步骤step真的获得运行才会被建立。 Job(做业):做业是封装整个批处理过程的实体。一个简单的做业须要配置做业名、有序的步骤step、及是否重启。 JobInstance(做业实例):一个做业实例与其要加载的数据无硬性关联,这彻底是由数据读入器ItemReader决定。好比:是否使用同一个做业实例,是由ItemReader根据前一次执行的状态位(state)决定。用新的JobInstance意味从开头读取数据,用已有的表示从上次结束的地方开始。 JobParameter(做业参数):是指一个批量做业开始的参数集。同时,能够用于标识JobInstance的惟一性。因此能够认为JobInstance=Job+JobParameter。 JobExecution:表示试运行一个做业的句柄。 以下图2.2.1所示,Job比如是容器,能够包含多个业务逻辑步骤step与多个JobInstance,来组织做业的执行(亦能够保证做业的重启),而JobExecution则是致力于记录执行状态。每一次执行中JobExecution和step都会进行数据信息传输,好比:commitCount、rollbackCount、startTime、endTime等,这些都会记录进StepExecution。 图2.2.1-批处理框架web

运行期的模型 JobLauncher(做业调度器):是Spring Batch框架基础设施层提供运行Job的能力。对于将给定Job名称和做Job Parameters的Job,由Java程序、命令行或者其它调度框架(如Quartz)中调用JobLauncher执行Job。 JobRepository(做业仓库):来存储Job执行期的元数据(这里的元数据是指Job Instance、Job Execution、Job Parameters、Step Execution、Execution Context等数据)。有两种默认实现——内存或数据库。若将元数据存放在数据库中,能够随时监控批处理Job的执行状态。Job执行结果是成功仍是失败,而且使失败Job从新启动Job成为可能。 ItemReader:是对step的输入的抽象,每次只读入一条记录,读取完全部记录后,则返回null。 ItemProcessor:是对每条记录按业务逻辑处理的抽象。 ItemWriter:是对step的输出的抽象,每次只能够提供给一次批做业或记录队(chunk)。 下图2.2.2显示了完整的SpringBatch领域概念模型。JobLancaster启动Job,Job可有多个Step组合,每个step对应一个ItemReader、ItemProcessor及ItemWriter,而JobRepository记录Job执行信息。spring

2.2.2-Spring Batch领域概念模型sql

3实战演习 光说不练假把式,这个章节就让咱们一块儿实战操练下。 3.1What I’ll build 定时天天凌晨1点,按业务需求将TEST_TASK_PROPERTY表和DQP_TEST_FILE表数据汇总整合到表DQP_REPORT_A,即将结果数据表汇总到统计表中。 3.2What you’ll need ● Eclipse ● JDK 1.7 or later ● Maven 3.0 3.3Set up the project 本工程是由maven构建,使用SpringBoot简化复杂的依赖配置及部署,使用Quartz做为任务调度框架,SpringBatch做为批处理框架,数据持久化使用JPA。 3.3.1pom.xml文件 [html] view plain copy 120.
121. <projectxmlnsprojectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
122. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
123. 4.0.0
124. com.william.lab.springboot.springbatch
125. springbatch
126. 0.0.1-SNAPSHOT
127. jar
128. springbatch
129. Testproject for Spring Boot + Spring Batch + Quartz
130.
131.
132. org.springframework.boot
133. spring-boot-starter-parent
134. 1.5.6.RELEASE
135.
136.
137.
138.
139. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
140. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
141. <java.version>1.7</java.version>
142.
143.
144.
145.
146. org.springframework.boot
147. spring-boot-starter-batch
148.
149.
150. org.springframework.boot
151. spring-boot-starter-data-jpa
152.
153.
154. slf4j-api
155. org.slf4j
156.
157.
158. jboss-logging
159. org.jboss.logging
160.
161.
162.
163.
164. org.springframework.boot
165. spring-boot-starter-web
166.
167.
168. log4j-over-slf4j
169. org.slf4j
170.
171.
172.
173.
174. mysql
175. mysql-connector-java
176. runtime
177.
178.
179. org.springframework
180. spring-context-support
181.
182.
183. org.springframework
184. spring-tx
185.
186.
187. org.quartz-scheduler
188. quartz
189. 2.2.1
190.
191.
192. slf4j-api
193. org.slf4j
194.
195.
196.
197.
198. org.quartz-scheduler
199. quartz-jobs
200. 2.2.1
201.
202.
203. commons-lang
204. commons-lang
205. 2.6
206.
207.
208.
209. com.jcraft
210. jsch
211. 0.1.54
212.
213.
214. commons-io
215. commons-io
216. 2.4
217.
218.
219. commons-net
220. commons-net
221. 3.1
222.
223.
224. org.springframework.boot
225. spring-boot-starter-test
226. test
227.
228.
229.
230.
231.
232.
233. org.springframework.boot
234. spring-boot-maven-plugin
235.
236.
237.
238.
3.3.2Batch做业模块配置 [java] view plain copy 50. @Configuration
51. @EnableBatchProcessing
52. public class BatchConfiguration {
53. @Autowired
54. private JobBuilderFactoryjobBuilderFactory;
55. @Autowired
56. private StepBuilderFactorystepBuilderFactory;
57. @PersistenceUnit
58. private EntityManagerFactory emf;
59.
60. @StepScope
61. publicJpaPagingItemReader reader() {
62. JpaPagingItemReaderreader = new JpaPagingItemReader();
63. reader.setQueryString("selectnew TestReport(ttp.taskId, tra.fileId, ttp.ruleId,sum( tra.count))"
64. + " fromTestFile tra,TestTaskProperty ttp WHERE ttp.taskId=tra.taskId AND ttp.beginTimeBETWEEN ?1 AND ?2 "
65. + "GROUP BYttp.taskId, tra.fileId, ttp.ruleId");
66. Map<String, Object>parameterValues = new HashMap<>();
67. parameterValues.put("1",CommonUtils.getTimeSection(0, 0, 0));
68. parameterValues.put("2",CommonUtils.getTimeSection(23, 59, 59));
69. reader.setParameterValues(parameterValues);
70. reader.setEntityManagerFactory(emf);
71. reader.setPageSize(Integer.MAX_VALUE);
72. return reader;
73. }
74.
75. @Bean
76. public TestFileProcessor processor(){
77. return newTestFileProcessor();
78. }
79.
80. @Bean
81. publicJpaItemWriter writer() {
82. JpaItemWriterwriter = new JpaItemWriter();
83. writer.setEntityManagerFactory(emf);
84. return writer;
85. }
86.
87. @Bean
88. public Step step() {
89. returnstepBuilderFactory.get("step").<TestReport, TestReport>chunk(10).reader(reader()).processor(processor())
90. .writer(writer()).build();
91. }
92.
93. @Bean
94. public Job importUserJob(JobRepositoryjobRepository) {
95. returnjobBuilderFactory.get("importUserJob").incrementer(newRunIdIncrementer()).repository(jobRepository)
96. .flow(step()).end().build();
97. }
98. }
在Spring的体系中@EnableBatchProcessing 注释的工做原理与其它的带有 @Enable * 的注释相似。在这种状况下, @EnableBatchProcessing 提供了构建批处理任务的基本配置。在这个基本的配置中,除了建立了一个StepScope的实例,还能够将一系列可用的bean进行自动装配: JobRepositorybean 名称 "jobRepository" JobLauncher bean名称"jobLauncher" JobRegistry bean名称"jobRegistry" PlatformTransactionManagerbean名称 "transactionManager" JobBuilderFactorybean名称"jobBuilders" StepBuilderFactorybean名称"stepBuilders" 这种配置的核心接口是BatchConfigurer。它为以上所述的bean提供了默认的实现方式,并要求在context中提供一个bean,即DataSource。数据库链接池由被JobRepository使用。 注意只有一个配置类须要有@ enablebatchprocessing注释。只要有一个类添加了这个注释,则以上全部的bean都是可使用的。 3.3.2.1做业Job和步骤Step Step()方法是组合特定业务需求步骤的,如上章节介绍,是由reader、processor和writer组成。importUserJob()方法提供的是组合业务做业的,由Step组成,并能够由jobRepository()方法将做业持久化。 3.3.2.2做业处理单元reader、writer、processor reader()方法是读取数据的方法,这里实例化是JpaPagingItemReader()方法。JpaPagingItemReader容许您声明一个JPQL语句,并传入一个 EntityManagerFactory。而后就和其余的 ItemReader 同样,每次调用它的 read 方法都会返回一个 item。当须要更多实体,则内部就会自动发生分页。 writer()方法是将处理结果持久化进数据库的,其中JpaItemWriter是 JPA EntityManager aware 的,用来处理事务性工做,而执行实际的写入工做是委托另外一个非jpa相关的(non-"jpa aware") ItemWriter作的。 processor()方法是业务数据处理方法,以下代码段,处理了简单业务逻辑。 [java] view plain copy 10. public class TestFileProcessor implementsItemProcessor<TestReport, TestReport> {
11. private static final Logger log=LoggerFactory.getLogger(TestFileProcessor.class);
12. @Override
13. public TestReport process(finalTestReport testReport) throws Exception {
14. testReport.setTimeSection(CommonUtils.getTimeSection(0,0, 0));
15. log.info("StatisticResult 【" +testReport + "】");
16. return testReport;
17. }
18. }
3.3.3Quartz调度模块配置 3.3.3.1Trigger触发器 [java] view plain copy 55. @Component("cronTriggerFactoryBean")
56. public class CronTriggerFactoryBean {
57. @Autowired
58. private SchedulerFactoryBeanschedulerFactoryBean;
59. /** 60. * 添加或修改一个定时任务 61. */
62. public void createNewTask(Stringexpression, int taskId) throws SchedulerException {
63. TriggerKey triggerKey =TriggerKey.triggerKey("TASK-" + taskId, "JOB-" +taskId);
64. CronTrigger trigger = null;
65. // 不存在,建立一个
66. JobKey jobKey = newJobKey("TASK-" + taskId, "JOB-" + taskId);
67. JobDetail jobDetail = JobBuilder.newJob(SpringQuartzJob.class).withIdentity(jobKey).build();
68. // 稽核任务基础信息
69. jobDetail.getJobDataMap().put("taskId",taskId);
70. // 表达式调度构建器
71. CronScheduleBuildercronScheduleBuilder = null;
72. cronScheduleBuilder =CronScheduleBuilder.cronSchedule(expression);
73. // 按cronExpression表达式构建一个新的trigger
74. trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).startAt(newDate()).withSchedule(cronScheduleBuilder).build();
75. // 加入任务队列
76. Scheduler scheduler =schedulerFactoryBean.getScheduler();
77. scheduler.scheduleJob(jobDetail,trigger);
78. scheduler.rescheduleJob(triggerKey,trigger);
79. }
80. }
81. 这是一个简单生成周期任务触发器类,由任务配置接口传入任务执行周期表达式(cron表达式)和任务编号等基础信息,创建CronTrigger定时触发器,调度quartz做业类。
82. 3.3.3.2
83.
84. @Component("springQuartzJob")
85. public class SpringQuartzJob extends QuartzJobBean {
86. @Autowired
87. Job importUserJob;
88. @Autowired
89. private JobLauncher jobLauncher;
90. @Override
91. public void executeInternal(finalJobExecutionContext context) throws JobExecutionException {
92. System.out.println("TestJobStart:" + Thread.currentThread().getId());
93. try {
94. init();
95. JobParameters jobParameters= new JobParametersBuilder().addLong("time", System.currentTimeMillis())
96. .toJobParameters();
97. JobExecution result =jobLauncher.run(importUserJob, jobParameters);
98. } catch (Exception e) {
99. e.printStackTrace();
100. }
101. System.out.println("Job1End");
102. }
103.
104. public void init() {
105. importUserJob =(Job) MyApplicationContextUtil.getBeanObj("importUserJob");
106. jobLauncher =(JobLauncher) MyApplicationContextUtil.getBeanObj("jobLauncher",JobLauncher.class);
107. }
108. }
JobParameters相似与Quartz中的JobDataMap,传递做业须要的数据。 jobLauncher.run()方法是经过做业Job和做业参数JobParameters来惟一标识做业仓库中已有的做业,并执行做业。 3.3.3.3ApplicationContextAware [java] view plain copy 20. public class MyApplicationContextUtil implementsApplicationContextAware {
21. private staticApplicationContext context;
22. public static void setContext(ApplicationContextcontext) {
23. MyApplicationContextUtil.context= context;
24. }
25. @Override
26. public void setApplicationContext(ApplicationContextcontext) throws BeansException {
27. this.context =context;
28. }
29. public staticApplicationContext getContext() {
30. return context;
31. }
32. public final staticObject getBeanObj(String beanName) {
33. return context.getBean(beanName);
34. }
35. public final static Object getBeanObj(StringbeanName, Class<?> requiredType) {
36. return context.getBean(beanName,requiredType);
37. }
38. }
MyApplicationContextUtil继承了ApplicationContextAware接口,实现public void setApplicationContext(ApplicationContext context)throwsBeansException方法,获取spring配置上下文ApplicationContext,用于经过bean名字获取bean方法public final static ObjectgetBeanObj(StringbeanName)。 3.3.4SpringbatchApplication启动类 [java] view plain copy 22. @SpringBootApplication
23. @PropertySource(value = {"./application.properties" })
24. publicclass SpringbatchApplication {
25. publicstatic ConfigurableApplicationContext ctx;
26. publicstatic void main(String[] args) {
27. ctx= SpringApplication.run(new Object[] { QuartzResource.class}, args);
28. }
29. @Bean
30. publicSchedulerFactoryBean schedulerFactoryBean() throws Exception {
31. SchedulerFactoryBeanschedulerFactoryBean = new SchedulerFactoryBean();
32. PropertiesquartzProperties = new Properties();
33. FileInputStream in = newFileInputStream("./src/main/resources/quartz.properties");
34. quartzProperties.load(in);
35. schedulerFactoryBean.setQuartzProperties(quartzProperties);
36. returnschedulerFactoryBean;
37. }
38. @Bean
39. publicMyApplicationContextUtil myApplicationContextUtil() {
40. returnnew MyApplicationContextUtil();
41. }
42. }
public SchedulerFactoryBean schedulerFactoryBean()throwsException方法是用于初始化quartz配置信息quartz.properties。 3.3.5一个建立定时任务的web接口 [java] view plain copy 20. @RestController
21. @ComponentScan(basePackages= { "com.william.lab.springboot.springbatch.springbatch" })
22. @RequestMapping("/quartz")
23. public class QuartzResource {
24. private Logger LOGGER =LoggerFactory.getLogger(QuartzResource.class);
25. @Autowired
26. private CronTriggerFactoryBeancronTriggerFactoryBean;
27.
28. final int CREATE_ID = 17;
29.
30. @RequestMapping(value ="/get/{taskId}", method = RequestMethod.GET)
31. public void createTask(@PathVariable("taskId")String taskId) throws SchedulerException {
32. String str[] =taskId.split(",");
33. for (int i = 0; i< str.length; i++) {
34. int taskIdx =Integer.parseInt(str[i]);
35. cronTriggerFactoryBean.createNewTask("00/1 * * * ?", 1);
36. }
37. }
38. }
这是一个简单的接口,用户能够经过此接口定义quartz调度batch做业任务。 3.3.6配置文件application.properties与quartz.properties 3.3.6.1application.properties [plain] view plain copy 23. # Tomcatport
24. server.port=18080
25. #Spring Batch
26. spring.batch.job.enabled=false
27. # MySQL DB
28. spring.datasource.url=jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8
29. spring.datasource.username=root
30. spring.datasource.password=123456
31. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
32. # log config
33. logging.config=file:./src/main/resources/logback-spring.xml
34. #database pool
35. spring.datasource.tomcat.max-idle=15
36. spring.datasource.tomcat.max-wait=1000
37. spring.datasource.tomcat.maxActive=50
38. spring.datasource.tomcat.min-idle=5
39. spring.datasource.tomcat.initial-size=10
40. spring.datasource.tomcat.validation-query=SELECT1
41. spring.datasource.tomcat.test-on-borrow=false
42. spring.datasource.tomcat.test-while-idle=true
43. spring.datasource.tomcat.time-between-eviction-runs-millis=18800
44. spring.datasource.tomcat.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)
注意:当配置文件里定义spring.batch.job.enabled为true,或者没定义(默认为true)的时候,会初始化一个JobLauncherCommandLineRunner的bean,自动执行batch配置好的做业Job。鉴于咱们将batch的做业Job调度任务交由Quartz调度,因此设置为false,这样工程启动后只会初始化batch做业配置,但不执行。 3.3.6.2quartz.properties [plain] view plain copy 27. # Configure MainScheduler Properties
28. org.quartz.scheduler.instanceName:DQPScheduler
29. org.quartz.scheduler.instanceId:AUTO
30. org.quartz.scheduler.skipUpdateCheck:false
31. # Configure ThreadPool
32. org.quartz.threadPool.class:org.quartz.simpl.SimpleThreadPool
33. org.quartz.threadPool.threadCount:1000
34. org.quartz.threadPool.threadPriority:5
35. # ConfigureJobStore
36. org.quartz.jobStore.misfireThreshold:60000
37. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
38. org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
39. org.quartz.jobStore.useProperties:false
40. org.quartz.jobStore.dataSource:dqpDS
41. org.quartz.jobStore.tablePrefix:dqp_qrtz_
42. org.quartz.jobStore.isClustered:false
43. # Configure Datasources
44. org.quartz.dataSource.dqpDS.driver:com.mysql.jdbc.Driver
45. org.quartz.dataSource.dqpDS.URL:jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true
46. org.quartz.dataSource.dqpDS.user:root
47. org.quartz.dataSource.dqpDS.password:123456
48. org.quartz.dataSource.dqpDS.maxConnections:100
49. org.quartz.dataSource.dqpDS.validationQuery=select1
50. org.quartz.dataSource.dqpDS.idleConnectionValidationSeconds=60
51. org.quartz.dataSource.dqpDS.validateOnCheckout=true
52. org.quartz.dataSource.dqpDS.discardIdleConnectionsSeconds=60
注意:最后4行配置是保证quartz的数据库链接池中,无效连接的释放。 4总结 Spring Batch将整个批处理做业流程分了3个基础阶段:读数据、业务处理、归档结果数据,且提供了许多读数据接口(文件,jpa,jdbc、MongDB等),一样写数据接口也很丰富(文件,jpa,jdbc、MongDB等),还有日志、监控、任务重启与跳过等特性。而开发者只须要关注事务的粒度,日志监控,执行方式,资源管理,读数据,处理数据,写数据的解耦等方面。可是,Spring Batch未提供关于批处理任务调度的功能,所以如何周期性的调用批处理任务须要本身想办法解决,就Java来讲,Quartz是一个不错的解决方案,或者写脚本处理之。数据库

相关文章
相关标签/搜索