详解应对平台高并发的分布式调度框架TBSchedule
tbschedule是一款很是优秀的高性能分布式调度框架,很是高兴能分享给你们。这篇文章是我结合多年tbschedule使用经验和研读三遍源码的基础上完成的,期间和阿里空玄有过很多技术交流,很是感谢空玄给予的大力支持。我写这篇文章的目的一是出于对tbschedule的一种热爱,二是如今是一个资源共享、技术共享的时代,但愿把它展示给你们(送人玫瑰,手留余香),能给你们的工做带来帮助。
1、tbschedule初识
时下互联网和电商领域,各个平台都存在大数据、高并发的特色,对数据处理的要求愈来愈高,既要保证高效性,又要保证安全性、准确性。tbschedule的使命就是将调度做业从业务系统中分离出来,下降或者是消除和业务系统的耦合度,进行高效异步任务处理。其实在互联网和电商领域tbschedule的使用很是普遍,目前被应用于阿里巴巴、淘宝、支付宝、京东、聚美、汽车之家、国美等不少互联网企业的流程调度系统。
在深刻了解tbschedule以前咱们先从内部和外部形态对它有个初步认识,如图1.一、图1.2。
从tbschedule的内部形态来讲,与他有关的关键词包括批量任务、动态扩展、多主机、多线程、并发、分片……,这些词看起来很是的高大上,都是时下互联网技术比较流行的词汇。从tbschedule的外部架构来看,一目了然,宿主在调度应用中与zookeeper进行通讯。一个框架结构是不是优秀的,从美感的角度就能够看出来,一个好的架构必定是隐藏了内部复杂的原理,外部视觉上美好的,让用户使用起来简单易懂。
2、tbschedule原理
为何TBSchedule值得推广呢?
传统的调度框架spring task、quartz也是能够进行集群调度做业的,一个节点挂了能够将任务漂移给其余节点执行从而避免单点故障,可是不支持分布式做业,一旦达到单机处理极限也会存在问题。
elastic-job支持分布式,是一个很好的调度框架,可是开源时间较短,尚未经历大范围市场考验。
Beanstalkd基于C语言开发,使用范围较小,没法引入到php、java系统平台。
tbschedule到底有多强大呢?我对tbschedule的优点特色进行了以下总结:
一、支持集群、分布式
二、灵活的任务分片
三、动态的服务扩容和资源回收
四、任务监控支持
tbschedule支持cluster,能够宿主在多台服务器多个线程组并行进行任务调度,或者说能够将一个大的任务拆成多个小任务分配到不一样的服务器。tbschedule的分布式机制是经过灵活的sharding方式实现的,好比能够按全部数据的ID按10取模分片(分片规则如图2.1)、按月份分片等等,根据不一样的需求,不一样的场景由客户端配置分片规则。咱们知道spring task、quarz也是能够进行集群调度做业的,一个节点挂了能够将任务漂移给其余节点执行从而避免单点故障,可是不支持分布式做业,一旦达到单机处理极限也会存在问题,这个就是tbschedule的优点。而后就是tbschedule的宿主服务器能够进行动态扩容和资源回收,这个特色主要是由于它后端依赖的zookeeper,这里的zookeeper对于tbschedule来讲是一个nosql,用于存储数据,它的数据结构相似文件系统的目录结构,它的节点有临时节点、持久节点之分。调度引擎上线后,随着业务量数据量的增多,当前cluster可能不能知足目前的处理需求,那么就须要增长服务器数量,一个新的服务器上线后会在zk中建立一个表明当前服务器的一个惟一性路径(临时节点),而且新上线的服务器会和zk有长链接,当通讯断开后,节点会自动摘除。tbschedule会定时扫描当前服务器的数量,从新分配任务。tbschedule不只提供了服务端的高性能调度服务,还提供了一个scheduleConsole war随着宿主应用的部署直接部署到服务器,能够经过web的方式对调度的任务、策略进行监控,实时更新。
是否是已经对tbschedule稍微了有些好感呢?咱们接着往下看。
tbschedule提供了两个核心组件ScheduleServer、TbscheduleManagerFactory和两类核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,这两部分是客户端研发的关键部分,是使用tbschedule必需要了解的。
ScheduleServer即任务处理器,的主要做用是任务和策略的管理、任务采集和执行,由一组工做线程组成,这组工做线程是基于队列实现的,进行任务抓取和任务处理(two mode)。每一个任务处理器和zookeeper有一个心跳通讯链接,用于检测server的状态和进行任务动态分配,举个例子,好比3服务器的worker集群执行出票消息生成任务,这对这个任务类型每台服务器能够配置一个ScheduleSever(即一个线程组),也能够配置两个线程组,至关于6台服务器在执行这个任务。当某台服务器宕机或者其余缘由与zk通讯断开时,他的任务将被其余服务器接管。ScheduleServer参数定义如图2.2
在这些参数中taskItems是一个很是重要的属性,是客户单能够自由发挥的地方,是任务分片的基础,好比咱们处理一个任务能够根据ID按10取模,那么任务项就是0-9,3台服务器分别拿到四、 三、 3个任务项,服务器的上下线都会对任务项进行从新分配。任务项是进行任务分配的最小单位。一个任务项只能由一个ScheduleServer来进行处理,但一个Server能够处理任意数量的任务项。这就是刚才咱们说的分片特性。
调度服务器TbscheduleManagerFactory的主要工做zookeeper链接参数配置和zookeeper的初始化、调度管理。
两类核心接口是须要被咱们定义的目标任务实现的,根据本身的须要进行任务采集(重写selectTasks方法)和任务执行(重写execute方法),这类接口是客户端研发自由发挥的地方。
接下来咱们深刻了解下tbschedule,看看它的内部是如何实现的。下面流程图是我花了不少心血经过一周时间画出来的,基本是清晰的展示了tbschedule内部的执行流程以及每一个步骤zookeer节点路径和数据的变化。由于图中的注释已经描述的很详细了,每一个节点右侧是zk的信息(数据结构见图2.3),这里就再也不作过多的文字描述了,有任何建议或者不明白的地方能够找我交流。
Tbschedule还有个强大之处是它提供了两种处理器模式模式:
一、SLEEP模式
当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。若是是,则本身休眠;若是其它线程都已经由于没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取须要处理的任务,放入任务池中,同时唤醒其它休眠线程开始工做。
二、NOTSLEEP模式
当一个线程任务处理完毕,从任务池中取不到任务的时候,当即调用业务接口获取须要处理的任务,放入任务池中。
SLEEP模式内部逻辑相对较简单,若是遇到大任务须要处理较长时间,可能会形成其余线程被动阻塞的状况。但其实生产环境通常都是小而快的任务,即便出现阻塞的状况ScheduleConsole也会及时的监控到。NOTSLEEP模式减小了线程休眠的时间,避免了因大任务形成阻塞的状况,但为了不数据被重复处理,增长了CPU在数据比较上的开销。TBSchedule默认是SLEEP模式。
到目前为止我相信你们对tbschedule有了一个深入的了解,心中的疑雾逐渐散开了。理论是实践的基础,实践才是最终的目的,下一节咱们将结合理论知识进行tbschedule实战.
3、tbschedule实战
在项目中使用tbschedule须要依赖zookeeper、tbschedule
zookeeper依赖:
Java代码
- <dependency>
- <groupId>org.apache.zookeeper</groupId
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
-
tbschedule依赖:
Java代码
- <dependency>
- <groupId>com.taobao.pamirs.schedule</groupId
- <artifactId>tbschedule</artifactId>
- <version>3.3.3.2</version>
- </dependency>
-
tbschedule有三种引入方式:
一、经过ScheduleConsole引入
Tbschedule随着宿主调度应用部署到服务器后,能够经过web浏览器的方式访问其提供监控平台。
第一步,初始化zookeeper
第二步,建立调度策略
第三步,建立调度任务
第四步,监控调度任务
二、经过原生java引入
Java代码
- // 初始化Spring
- ApplicationContext ctx = new FileSystemXmlApplicationContext(
- "spring-config.xml");
-
- // 初始化调度工厂
- TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
-
- Properties p = new Properties();
- p.put("zkConnectString", "127.0.0.1:2181");
- p.put("rootPath", "/taobao-schedule/train_worker");
- p.put("zkSessionTimeout", "60000");
- p.put("userName", "train_dev");
- p.put("password", " train_dev ");
- p.put("isCheckParentPath", "true");
-
- scheduleManagerFactory.setApplicationContext(ctx);
-
- scheduleManagerFactory.init(p);
-
- // 建立任务调度任务的基本信息
- String baseTaskTypeName = "DemoTask";
- ScheduleTaskType baseTaskType = new ScheduleTaskType();
- baseTaskType.setBaseTaskType(baseTaskTypeName);
- baseTaskType.setDealBeanName("demoTaskBean");
- baseTaskType.setHeartBeatRate(10000);
- baseTaskType.setJudgeDeadInterval(100000);
- baseTaskType.setTaskParameter("AREA=BJ,YEAR>30"); baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
- "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
- "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
- "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
- baseTaskType.setFetchDataNumber(500);
- baseTaskType.setThreadNumber(5);
- this.scheduleManagerFactory.getScheduleDataManager()
- .createBaseTaskType(baseTaskType);
- log.info("建立调度任务成功:" + baseTaskType.toString());
-
- // 建立任务的调度策略
- String taskName = baseTaskTypeName;
- String strategyName =taskName +"-Strategy";
- try {
- this.scheduleManagerFactory.getScheduleStrategyManager()
- .deleteMachineStrategy(strategyName,true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- ScheduleStrategy strategy = new ScheduleStrategy();
- strategy.setStrategyName(strategyName);
- strategy.setKind(ScheduleStrategy.Kind.Schedule);
- strategy.setTaskName(taskName);
- strategy.setTaskParameter("china");
-
- strategy.setNumOfSingleServer(1);
- strategy.setAssignNum(10);
- strategy.setIPList("127.0.0.1".split(","));
- this.scheduleManagerFactory.getScheduleStrategyManager()
- .createScheduleStrategy(strategy);
- log.info("建立调度策略成功:" + strategy.toString());
三、经过spring容器引入
Java代码
- <!-- 初始化zookeeper -->
- <bean id="scheduleManagerFactory"
- class="com.xx.TBScheduleManagerFactory" init-method="init">
- <property name="zkConfig">
- <map>
- <entry key="zkConnectString" value="127.0.0.1:2181" />
- <entry key="rootPath" value="/taobao-schedule/train_worker" />
- <entry key="zkSessionTimeout" value="60000" />
- <entry key="userName" value="train_dev" />
- <entry key="password" value="train_dev" />
- <entry key="isCheckParentPath" value="true" />
- </map>
- </property>
- </bean>
- <!-- 配置调度策略 凌晨1点到3点执行 -->
- <bean id="abstractDemoScheduleTask" class="com.xx.core.tbschedule.InitMyScheduleTask" abstract="true">
- <property name="scheduleTaskType.heartBeatRate" value="10000" />
- <property name="scheduleTaskType.judgeDeadInterval" value="100000" />
- <property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/>
- <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>
- <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
- <property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
- <property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
- <property name="scheduleTaskType.fetchDataNumber" value="500" />
- <property name="scheduleTaskType.executeNumber" value="1" />
- <property name="scheduleTaskType.threadNumber" value="5" />
- <property name="scheduleTaskType.taskItems">
- <list>
- <value>0:{TYPE=A,KIND=1}</value>
- <value>1:{TYPE=A,KIND=2}</value>
- <value>2:{TYPE=A,KIND=3}</value>
- <value>3:{TYPE=A,KIND=4}</value>
- <value>4:{TYPE=A,KIND=5}</value>
- <value>5:{TYPE=A,KIND=6}</value>
- <value>6:{TYPE=A,KIND=7}</value>
- <value>7:{TYPE=A,KIND=8}</value>
- <value>8:{TYPE=A,KIND=9}</value>
- <value>9:{TYPE=A,KIND=10}</value>
- </list>
- </property>
- <property name="scheduleStrategy.kind" value="Schedule" />
- <property name="scheduleStrategy.numOfSingleServer" value="1" />
- <property name="scheduleStrategy.assignNum" value="10" />
- <property name="scheduleStrategy.iPList">
- <list>
- <value>127.0.0.1</value>
- </list>
- </property>
- </bean>
- <!-- 配置调度任务 -->
- <bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
- <property name="scheduleTaskType.baseTaskType" value="demoTask" />
- <property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
- <property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
- <property name="scheduleStrategy.taskName" value="demoTaskBean" />
- </bean>
调度任务具体实现 DemoTask.java
Java代码
- /**
- * DemoTask任务类
- */
- ublic class DemoTask mplements
- IScheduleTaskDealSingle,TScheduleTaskDeal {
-
- /**
- * 数据采集
- * @param taskItemNum--分配的任务项 taskItemList--总任务项
- * eachFetchDataNum--采集任务数量
- */
- @Override
- public List<DemoTask> selectTasks(String taskParameter,
- String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
- int eachFetchDataNum) throws Exception {
- List<DemoTask> taskList = new LinkedList<DemoTask>();
- //客户端根据条件进行数据采集start
-
- //客户端根据条件进行数据采集end
- return rt;
- }
- **
- * 数据处理
- */
- @Override
- public boolean execute(DemoTask task, String ownSign)
- throws Exception {
- //客户端pop任务进行处理start
-
- //客户端pop任务进行处理end
- return true;
- }
其实咱们看对于tbscchedule客户端的使用很是简单,初始化zk、配置调度策略、调度任务,对调度任务进行实现,就这几个步骤。如今能够庆祝下了,你又掌握了一个优秀开源框架的设计思想和使用方式。
4、tbschedule挑战
任何事物都是没有最好只有更好,tbschedule也同样,虽然它如今已经很完美了,咱们不能放弃对更完美的追求。阿里团队能够在下面几个方面进行优化。
一、ScheduleConsole监控页面优化,目前ScheduleConsole监控页面过于简单,需完善UI设计,提升用户体验。
二、Zookeeper集群自动切换,避免zk服务的集群点多故障
三、原生zookeeper操做替换为curator,Curator对ZooKeeper进行了一次包装,对原生ZooKeeper的操做作了大量优化,Client和Server之间的链接可能出现的问题处理等等,能够进一步提升TBSchedule的高可用。
四、帮助文档较少,网上的资料基本是千篇一概,但愿有更多的爱好者加入进来。
至此,咱们已经完成了对tbschedule的所有介绍,尽快使用起来吧!
个人博客:http://mycolababy.iteye.com/
欢迎关注本站公众号,获取更多信息