elastic-job-lite

1. 为何不用quartzjava

  经过定时任务来进行计算,若是数量很少,能够轻易的用quartz来完成,若是用户量特别大,可能短期内处理不完须要处理的数据。另外若是咱们将job直接放在咱们的webapp里,webapp一般是多节点部署的,这样,项目须要每隔一段时间执行某个定时任务,可是因为同时部署在多台机器上,所以可能会出现任务被执行屡次,形成重复数据的状况,咱们的job也就是多节点,形成了多个job同时执行,致使job重复执行,为了不这种状况,咱们可能多job的节点进行加锁,保证只有一个节点能执行,或者将job从webapp里剥离出来,独自部署一个节点。Elastic job是当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。Elastic job主要的功能有支持弹性扩容,经过Zookepper集中管理和监控job,支持失效转移等,这些都是Quartz等其余定时任务没法比拟的。python

2. 原理web

  elastic底层的任务调度仍是使用quartz,经过zookeeper来动态给job节点分片,使用elastic-job开发的做业都是客户的,假如咱们须要使用3台机器跑job,咱们将任务分红3片,框架经过zk的协调,最终会让3台机器分别分配0,1,2的任务片,好比server0-->0,server1-->1,server2-->2,当server0执行时,能够只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。当分片数为1时,在同一个zookepper和jobname状况下,多台机器部署了Elastic job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其余的机器不执行当分片数大于1时,假若有3台服务器,分红10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,spring

举例场景:shell

假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为所有数据数据库

若是服务器C崩溃,Elastic Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9服务器

此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为所有数据.架构

在上述基础上,若是咱们增长server3,此时,server3分不到任务分片,由于任务分片只有3片,已经分完了,没有分到任务分片的程序不执行。若是server2挂了,那么server2的任务分片会分给server3,server3有了分片后就会执行。若是server3也挂了,框架会自动将server3的分片随机分给server0或server1,这种特性称之为弹性扩容,也就是elastic-job的由来。app

  elastic-job不支持单机多实例,经过zk的协调分片是以ip为单元的,若是经过单机多实例来试验,结果会致使分片和预期不一致,能够经过虚拟机模拟多台机器。框架

3. 做业类型

  elastic-job 提供了三种类型的做业:simple,dataflow,script。script类型做业为脚本类型做业,支持shell,python等类型脚本。simple类型须要实现SimpleJob接口,未通过任何封装,和quartz原生接口类似。dataflow类型用于处理数据流,须要实现DafaflowJob接口,该接口提供了两个方法能够覆盖,分别用于抓取fetchData和处理processData数据。

4. 代码演示

1. 依赖

<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>

 

2.编写job

public class OrderStatisticsJob implements SimpleJob {

	private static final Logger log = LoggerFactory.getLogger(OrderStatisticsJob.class);

	OrdersService ordersSerivce = null;

	/** 读取配置(配置文件之后上分布式配置动态维护) **/
	private void readConfig() {
		ordersSerivce = (OrdersService) ApplicationHelp.getBean("ordersService");
	}

	synchronized public void start(int sharding) {

	}

	@Override
	public void execute(ShardingContext shardingContext) {
		// TODO Auto-generated method stub
		log.info("shardingContext:{}", shardingContext.getShardingItem());
		readConfig();
		start(1);
	}
}
public class MyDataFlowJob implements DataflowJob<User> { @Override public List<User> fetchData(ShardingContext shardingContext) { List<User> users = null;//查询users from db
        return users; } @Override public void processData(ShardingContext shardingContext, List<User> data) { for (User user: data) { user.setStatus(1); //update user
 } } }

 

 

3. Spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <!--配置做业注册中心 -->
    <reg:zookeeper id="regCenter" server-lists="localhost:2181" namespace="dd-job"
                   base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
 
    <!-- 配置做业-->
    <job:simple id="orderStatisticsJob" class="com.beta.cb.mall.task.job.OrderStatisticsJob" registry-center-ref="regCenter" sharding-total-count="2" cron="0/2 * * * * ?" />
   <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"              sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />
</beans>
相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息