Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。java
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。python
官网地址:elasticjob.io/git
Github:github.com/elasticjob/…github
目前咱们公司用的是基于Linux Crontab的定时任务执行器。web
存在以下问题:shell
除了Linux Crontab在java这块的方案还有 Quartz,但 Quartz缺乏分布式并行调度的功能。bash
存在的问题也很明显:服务器
这种状况下可能须要本身去开发一个可以知足公司业务需求的调度框架,成本较高,不推荐微信
以前我也有想过要本身写一个,思路有了,就是还没开始,调度框架只要是调度问题,像Elastic-Job就作的很是好,它把分片的规则让你本身定义,而后根据你定的片的数据给你调度下,至于每一个节点处理什么数据你本身去控制。架构
若是说部采用这种方式,也不去写数据的分发,那么我以为最简单的办法就是用消息队列来实现了。
采用zookeeper来作调度,存储任务数据,定义一个通用的接口,分红2部分,以下:
public interface Job {
void read();
void process(Object data);
}
复制代码
而后使用者经过实现上面的接口来读取须要处理的数据,在process中处理分发过来的数据
至于分发的话一个任务能够经过注解来标记使用一个队列,也可使用通用的,这样就能够实现多个消费者同时消费了,就算其中一个挂掉也不影响整个任务,也不用考虑失效转移了。
可是要作控制的是read方法,必须只有一个节点执行,否则数据就分发重复了。
上面只是提供一个简单的思路,固然有web页面管理任务,也能够手动执行任务等等。
TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常情况时是有缺陷的。并且TBSchedule做业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重。
Spring Batch: Spring Batch是一个轻量级的,彻底面向Spring的批处理框架,能够应用于企业级大量的数据处理系统。Spring Batch以POJO和你们熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch能够提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,做业处理统计工做从新启动、跳过,和资源管理等重要功能。
Elastic-Job:国内开源产品,中文文档,入门快速,使用简单,功能齐全,社区活跃,由当当网架构师张亮主导,目前在开源方面投入了比较多的时间。
Simple:简单做业,经常使用, 意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口类似,但提供了弹性扩缩容和分片等功能。
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
复制代码
DataFlow:Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
public class MyElasticJob implements DataflowJob<Foo> {
@Override
public List<Foo> fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
List<Foo> data = // get data from database by sharding item 0
return data;
case 1:
List<Foo> data = // get data from database by sharding item 1
return data;
case 2:
List<Foo> data = // get data from database by sharding item 2
return data;
// case n: ...
}
}
@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
// process data
// ...
}
}
复制代码
Script:Script类型做业意为脚本类型做业,支持shell,python,perl等全部类型脚本。只需经过控制台或代码配置scriptCommandLine便可,无需编码。执行脚本路径可包含参数,参数传递完毕后,做业框架会自动追加最后一个参数为做业运行时信息。
在特定的业务需求下,A任务执行完以后,须要执行B任务,以此类推,这种具备依赖性的流水式的任务。
在目前能够将这些任务合在一块儿,经过代码调用的方式来达到效果。
但我但愿能增长这样一个功能,好比加一个配置,job-after="com.xxx.job.XXXJob" 在执行完这个任务以后,自动调用另外一个任务BB,BB任务只须要配置任务信息,把cron去掉就能够,由于BB是依靠别的任务触发执行的。
固然这些任务必须在同一个zk的命名空间下,若是能支持夸命名空间就更好了
这样就能达到,流水式的任务操做了,而且每一个任务能够用不一样的分片key
复制代码
demo地址:github.com/elasticjob/…
/**
* 用户维度统计任务<br>统计出用户的房产,置换,贷款等信息
* @author yinjihuan
*/
public class UserStatJob implements SimpleJob {
private Logger logger = LoggerFactory.getLogger(UserStatJob.class);
@Autowired
private EnterpriseProductUserService enterpriseProductUserService;
@Autowired
private UserStatService userStatService;
@Autowired
private HouseInfoService houseInfoService;
@Autowired
private HouseSubstitutionService houseSubstitutionService;
@Autowired
private LoanApplyService loanApplyService;
@Override
public void execute(ShardingContext shardingContext) {
logger.info("开始执行UserStatJob");
long total = enterpriseProductUserService.queryCount();
int pages = PageBean.calcPages(total, 1000);
for (int i = 1; i <= pages; i++) {
List<EnterpriseProductUser> users = enterpriseProductUserService.queryByPage(i, 1000);
for (EnterpriseProductUser user : users) {
try {
processStat(user);
} catch (Exception e) {
logger.error("用户维度统计任务异常", e);
DingDingMessageUtil.sendTextMessage("用户维度统计任务异常:" + e.getMessage());
}
}
}
logger.info("UserStatJob执行结束");
}
private void processStat(EnterpriseProductUser user) {
UserStat stat = userStatService.getByUid(user.getEid(), user.getUid());
Long eid = user.getEid();
String uid = user.getUid();
if (stat == null) {
stat = new UserStat();
stat.setEid(eid);
stat.setUid(uid);
stat.setUserAddTime(user.getAddTime());
stat.setCity(user.getCity());
stat.setRegion(user.getRegion());
}
stat.setHouseCount(houseInfoService.queryCountByEidAndUid(eid, uid));
stat.setHousePrice(houseInfoService.querySumMoneyByEidAndUid(eid, uid));
stat.setSubstitutionCount(houseSubstitutionService.queryCount(eid, uid));
stat.setSubstitutionMaxPrice(houseSubstitutionService.queryMaxBudget(eid, uid));
stat.setLoanEvalCount(loanApplyService.queryUserCountByType(eid, uid, 2));
stat.setLoanEvalMaxPrice(loanApplyService.queryMaxEvalMoney(eid, uid));
stat.setLoanCount(loanApplyService.queryUserCountByType(eid, uid, 1));
stat.setModifyDate(new Date());
userStatService.save(stat);
}
}
复制代码
<!-- 用户统计任务 天天1点10分执行 -->
<job:simple id="userStatJob" class="com.fangjia.job.fsh.job.UserStatJob" registry-center-ref="regCenter"
sharding-total-count="1" cron="0 10 1 * * ?" sharding-item-parameters=""
failover="true" description="【房生活】用户维度统计任务,统计出用户的房产,置换,贷款等信息 UserStatJob"
overwrite="true" event-trace-rdb-data-source="elasticJobLog" job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler">
<job:listener class="com.fangjia.job.fsh.listener.MessageElasticJobListener"></job:listener>
</job:simple>
复制代码
/**
* 做业监听器, 执行先后发送钉钉消息进行通知
* @author yinjihuan
*/
public class MessageElasticJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
String date = DateUtils.date2Str(new Date());
String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务开始执行====" + JsonUtils.toJson(shardingContexts);
DingDingMessageUtil.sendTextMessage(msg);
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
String date = DateUtils.date2Str(new Date());
String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务执行结束====" + JsonUtils.toJson(shardingContexts);
DingDingMessageUtil.sendTextMessage(msg);
}
}
复制代码
能够在每一个任务类上定义一个注解,注解用来标识这个任务是谁开发的,而后对应的钉钉消息就发送给谁,我我的建议仍是建一个群,而后你们都在里面,由于若是单独发给一个开发人员,除非他的主动性很高,否则也没什么用,我我的建议发在群里,这样领导看见了就会说那个谁谁谁,你的任务报错了,去查下缘由。我这边是统一发的,没有定义注解。
任务的异常处理,能够在任务中对异常进行处理,除了记录日志,也用统一封装好的发送钉钉消息来进行通知,实时知道任务是否有异常,能够查看我上面的代码。
还有一种是没捕获的异常,怎么通知到群里,能够自定义异常处理类来实现, 经过配置job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler"
/**
* 自定义异常处理,在任务异常时使用钉钉发送通知
* @author yinjihuan
*/
public class CustomJobExceptionHandler implements JobExceptionHandler {
private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);
@Override
public void handleException(String jobName, Throwable cause) {
logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
DingDingMessageUtil.sendTextMessage("【"+jobName+"】任务异常。" + cause.getMessage());
}
}
复制代码
能够经过监听job_name\instances\job_instance_id节点是否存在来判断做业节点是否挂掉,该节点为临时节点,若是做业服务器下线,该节点将删除。固然也能够经过其余的工具来进行监控。
任务的编写尽可能考虑到水平扩展性,像我上面贴的那个列子其实就没考虑到,只是一个单纯的任务,由于我没有用到shardingParameter来处理对应的片的数据,这边其实建议你们考虑下,若是任务时间短。处理的数据少,能够写成我这样。若是可以预计到将来有大量数据须要处理,并且时间很长的话最好配置下分片的规则,而且将代码写成按分片来处理,这样到了后面就直接修改配置,增长下节点就好了。
更多技术分享请关注微信公众号:猿天地