Vert.x 蓝图项目已经发布至Vert.x官方网站:Vert.x Blueprint Tutorialsjava
本文章是 Vert.x 蓝图系列 的第二篇教程。全系列:git
欢迎回到Vert.x 蓝图系列~在本教程中,咱们将利用Vert.x开发一个基于消息的应用 - Vert.x Kue,它是一个使用Vert.x开发的优先级工做队列,数据存储使用的是 Redis。Vert.x Kue是 Automattic/kue 的Vert.x实现版本。咱们可使用Vert.x Kue来处理各类各样的任务,好比文件转换、订单处理等等。github
经过本教程,你将会学习到如下内容:web
本教程是Vert.x 蓝图系列的第二篇教程,对应的Vert.x版本为3.3.2。本教程中的完整代码已托管至GitHub。redis
既然咱们要用Vert.x开发一个基于消息的应用,那么咱们先来瞅一瞅Vert.x的消息系统吧~在Vert.x中,咱们能够经过 Event Bus 来发送和接收各类各样的消息,这些消息能够来自不一样的Vertx
实例。怎么样,很酷吧?咱们都将消息发送至Event Bus上的某个地址上,这个地址能够是任意的字符串。算法
Event Bus支持三种消息机制:发布/订阅(Publish/Subscribe)、点对点(Point to point)以及请求/回应(Request-Response)模式。下面咱们就来看一看这几种机制。json
在发布/订阅模式中,消息被发布到Event Bus的某一个地址上,全部订阅此地址的Handler
都会接收到该消息而且调用相应的处理逻辑。咱们来看一看示例代码:后端
EventBus eventBus = vertx.eventBus(); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("1: " + r.body()); }); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("2: " + r.body()); }); eventBus.publish("foo.bar.baz", "+1s"); // 向此地址发送消息
咱们能够经过vertx.eventBus()
方法获取EventBus
的引用,而后咱们就能够经过consume
方法订阅某个地址的消息而且绑定一个Handler
。接着咱们经过publish
向此地址发送消息。若是运行上面的例子,咱们会获得一下结果:ruby
2: +1s 1: +1s
若是咱们把上面的示例中的publish
方法替代成send
方法,上面的实例就变成点对点模式了。在点对点模式中,消息被发布到Event Bus的某一个地址上。Vert.x会将此消息传递给其中监听此地址的Handler
之一。若是有多个Handler
绑定到此地址,那么就使用轮询算法随机挑一个Handler
传递消息。好比在此示例中,程序只会打印2: +1s
或者1: +1s
之中的一个。架构
当咱们绑定的Handler
接收到消息的时候,咱们可不能够给消息的发送者回复呢?固然了!当咱们经过send
方法发送消息的时候,咱们能够同时指定一个回复处理函数(reply handler)。而后当某个消息的订阅者接收到消息的时候,它就能够给发送者回复消息;若是发送者接收到了回复,发送者绑定的回复处理函数就会被调用。这就是请求/回应模式。
好啦,如今咱们已经粗略了解了Vert.x中的消息系统 - Event Bus的基本使用,下面咱们就看看Vert.x Kue的基本设计。有关更多关于Event Bus的信息请参考Vert.x Core Manual - Event Bus。
在咱们的项目中,咱们将Vert.x Kue划分为两个模块:
kue-core
: 核心组件,提供优先级队列的功能kue-http
: Web组件,提供Web UI以及REST API另外咱们还提供一个示例模块kue-example
用于演示以及阐述如何使用Vert.x Kue。
既然咱们的项目有两个模块,那么你必定会好奇:两个模块之间是如何进行通讯的?而且若是咱们写本身的Kue应用的话,咱们该怎样去调用Kue Core中的服务呢?不要着急,谜底将在后边的章节中揭晓:-)
回顾一下Vert.x Kue的做用 - 优先级工做队列,因此在Vert.x Kue的核心模块中咱们设计了如下的类:
Job
- 任务(做业)数据实体JobService
- 异步服务接口,提供操做任务以及获取数据的相关逻辑KueWorker
- 用于处理任务的VerticleKue
- 工做队列前边咱们提到过,咱们的两个组件之间须要一种通讯机制能够互相通讯 - 这里咱们使用Vert.x的集群模式,即以clustered的模式来部署Verticle。这样的环境下的Event Bus一样也是集群模式的,所以各个组件能够经过集群模式下的Event Bus进行通讯。很不错吧?在Vert.x的集群模式下,咱们须要指定一个集群管理器ClusterManager
。这里咱们使用默认的HazelcastClusterManager
,使用 Hazelcast 做为集群管理。
在Vert.x Kue中,咱们将JobService
服务发布至分布式的Event Bus上,这样其它的组件就能够经过Event Bus调用该服务了。咱们设计了一个KueVerticle
用于注册服务。Vert.x提供了Vert.x Service Proxy(服务代理组件),能够很方便地将服务注册至Event Bus上,而后在其它地方获取此服务的代理并调用。咱们将在下面的章节中详细介绍Vert.x Service Proxy。
在咱们的Vert.x Kue中,大多数的异步方法都是基于Future
的。若是您看过蓝图系列的第一篇文章的话,您必定不会对这种模式很陌生。在Vert.x 3.3中,咱们的Future
支持基本的响应式的操做,好比map
和compose
。它们用起来很是方便,由于咱们能够将多个Future
以响应式的方式组合起来而不用担忧陷入回调地狱中。
正如咱们在Vert.x Kue 特性介绍中提到的那样,Vert.x Kue支持两种级别的事件:任务事件(job events) 以及 队列事件(queue events)。在Vert.x Kue中,咱们设计了三种事件地址:
vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}
: 某个特定任务的任务事件地址vertx.kue.handler.workers.{eventType}
: (全局)队列事件地址vertx.kue.handler.workers.{eventType}.{addressId}
: 某个特定任务的内部事件地址在特性介绍文档中,咱们提到了如下几种任务事件:
start
开始处理一个任务 (onStart
)promotion
一个延期的任务时间已到,提高至工做队列中 (onPromotion
)progress
任务的进度变化 (onProgress
)failed_attempt
任务处理失败,可是还能够重试 (onFailureAttempt
)failed
任务处理失败而且不能重试 (onFailure
)complete
任务完成 (onComplete
)remove
任务从后端存储中移除 (onRemove
)队列事件也类似,只不过须要加前缀job_
。这些事件都会经过send
方法发送至Event Bus上。每个任务都有对应的任务事件地址,所以它们可以正确地接收到对应的事件并进行相应的处理逻辑。
特别地,咱们还有两个内部事件:done
和done_fail
。done
事件对应一个任务在底层的处理已经完成,而done_fail
事件对应一个任务在底层的处理失败。这两个事件使用第三种地址进行传递。
在Vert.x Kue中,任务共有五种状态:
INACTIVE
: 任务还未开始处理,在工做队列中等待处理ACTIVE
: 任务正在处理中COMPLETE
: 任务处理完成FAILED
: 任务处理失败DELAYED
: 任务延时处理,正在等待计时器时间到并提高至工做队列中咱们使用状态图来描述任务状态的变化:
以及任务状态的变化伴随的事件:
为了让你们对Vert.x Kue的架构有大体的了解,我用一幅图来简略描述整个Vert.x Kue的设计:
如今咱们对Vert.x Kue的设计有了大体的了解了,下面咱们就来看一看Vert.x Kue的代码实现了~
咱们来开始探索Vert.x Kue的旅程吧!首先咱们先从GitHub上clone源代码:
git clone https://github.com/sczyh30/vertx-blueprint-job-queue.git
而后你能够把项目做为Gradle项目导入你的IDE中。(如何导入请参考相关IDE帮助文档)
正如咱们以前所提到的,咱们的Vert.x Kue中有两个功能模块和一个实例模块,所以咱们须要在Gradle工程文件中定义三个子工程。咱们来看一下本项目中的build.gradle
文件:
configure(allprojects) { project -> ext { vertxVersion = "3.3.2" } apply plugin: 'java' repositories { jcenter() } dependencies { compile("io.vertx:vertx-core:${vertxVersion}") compile("io.vertx:vertx-codegen:${vertxVersion}") compile("io.vertx:vertx-rx-java:${vertxVersion}") compile("io.vertx:vertx-hazelcast:${vertxVersion}") compile("io.vertx:vertx-lang-ruby:${vertxVersion}") testCompile("io.vertx:vertx-unit:${vertxVersion}") testCompile group: 'junit', name: 'junit', version: '4.12' } sourceSets { main { java { srcDirs += 'src/main/generated' } } } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 } } project("kue-core") { dependencies { compile("io.vertx:vertx-redis-client:${vertxVersion}") compile("io.vertx:vertx-service-proxy:${vertxVersion}") } jar { archiveName = 'vertx-blueprint-kue-core.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.queue.KueVerticle' } } task annotationProcessing(type: JavaCompile, group: 'build') { // codegen source = sourceSets.main.java classpath = configurations.compile destinationDir = project.file('src/main/generated') options.compilerArgs = [ "-proc:only", "-processor", "io.vertx.codegen.CodeGenProcessor", "-AoutputDirectory=${project.projectDir}/src/main" ] } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 dependsOn annotationProcessing } } project("kue-http") { dependencies { compile(project(":kue-core")) compile("io.vertx:vertx-web:${vertxVersion}") compile("io.vertx:vertx-web-templ-jade:${vertxVersion}") } jar { archiveName = 'vertx-blueprint-kue-http.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.http.KueHttpVerticle' } } } project("kue-example") { dependencies { compile(project(":kue-core")) } jar { archiveName = 'vertx-blueprint-kue-example.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle' } } } task wrapper(type: Wrapper) { gradleVersion = '2.12' }
(⊙o⊙)…比以前的待办事项服务项目中的长很多诶。。。咱们来解释一下:
configure(allprojects)
做用域中,咱们配置了一些全局信息(对全部子工程都适用)。kue-core
、kue-http
以及kue-example
。这里咱们来解释一下里面用到的依赖。在kue-core
中,vertx-redis-client
用于Redis通讯,vertx-service-proxy
用于Event Bus上的服务代理。在kue-http
中,咱们将kue-core
子工程做为它的一个依赖。vertx-web
和vertx-web-templ-jade
用于Kue Web端的开发。annotationProcessing
用于注解处理(Vert.x Codegen)。咱们已经在上一篇教程中介绍过了,这里就不展开讲了。咱们还须要在 settings.gradle
中配置工程:
rootProject.name = 'vertx-blueprint-job-queue' include "kue-core" include "kue-http" include "kue-example"
看完了配置文件之后,咱们再来浏览一下咱们的项目目录结构:
. ├── build.gradle ├── kue-core │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources ├── kue-example │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources ├── kue-http │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources └── settings.gradle
在Gradle中,项目的源码都位于{projectName}/src/main/java
目录内。这篇教程是围绕Vert.x Kue Core的,因此咱们的代码都在kue-core
目录中。
好啦!如今咱们已经对Vert.x Kue项目的总体结构有了大体的了解了,下面咱们开始源码探索之旅!
Vert.x Kue是用来处理任务的,所以咱们先来看一下表明任务实体的Job
类。Job
类位于io.vertx.blueprint.kue.queue
包下。代码可能有点长,不要担忧,咱们把它分红几部分,分别来解析。
咱们先来看一下Job
类中的成员属性:
@DataObject(generateConverter = true) public class Job { // job properties private final String address_id; private long id = -1; private String zid; private String type; private JsonObject data; private Priority priority = Priority.NORMAL; private JobState state = JobState.INACTIVE; private long delay = 0; private int max_attempts = 1; private boolean removeOnComplete = false; private int ttl = 0; private JsonObject backoff; private int attempts = 0; private int progress = 0; private JsonObject result; // job metrics private long created_at; private long promote_at; private long updated_at; private long failed_at; private long started_at; private long duration; // ... }
我去。。。好多属性!咱们一个一个地解释:
address_id
: 一个UUID序列,做为Event Bus的地址id
: 任务的编号(id)type
: 任务的类型data
: 任务携带的数据,以 JsonObject
类型表示priority
: 任务优先级,以 Priority
枚举类型表示。默认优先级为正常(NORMAL
)delay
: 任务的延迟时间,默认是 0state
: 任务状态,以 JobState
枚举类型表示。默认状态为等待(INACTIVE
)attempts
: 任务已经尝试执行的次数max_attempts
: 任务尝试执行次数的最大阈值removeOnComplete
: 表明任务完成时是否自动从后台移除zid
: zset
操做对应的编号(zid),保持先进先出顺序ttl
: TTL(Time to live)backoff
: 任务重试配置,以 JsonObject
类型表示progress
: 任务执行的进度result
: 任务执行的结果,以 JsonObject
类型表示还有这些统计数据:
created_at
: 表明此任务建立的时间promote_at
: 表明此任务从延时状态被提高至等待状态时的时间updated_at
: 表明任务更新的时间failed_at
: 表明任务失败的时间started_at
: 表明任务开始的时间duration
: 表明处理任务花费的时间,单位为毫秒(ms
)你可能注意到在 Job
类中还存在着几个静态成员变量:
private static Logger logger = LoggerFactory.getLogger(Job.class); private static Vertx vertx; private static RedisClient client; private static EventBus eventBus; public static void setVertx(Vertx v, RedisClient redisClient) { vertx = v; client = redisClient; eventBus = vertx.eventBus(); }
对于 logger
对象,我想你们应该都很熟悉,它表明一个Vert.x Logger实例用于日志记录。可是你必定想问为何 Job
类中存在着一个Vertx
类型的静态成员。Job
类不该该是一个数据对象吗?固然咯!Job
类表明一个数据对象,但不只仅是一个数据对象。这里我模仿了一些Automattic/kue的风格,把一些任务相关逻辑方法放到了Job
类里,它们大多都是基于Future
的异步方法,所以能够很方便地去调用以及进行组合变换。好比:
job.save() .compose(Job::updateNow) .compose(j -> j.log("good!"));
因为咱们不能在Job
类被JVM加载的时候就获取Vertx
实例,咱们必须手动给Job
类中的静态Vertx
成员赋值。这里咱们是在Kue
类中对其进行赋值的。当咱们建立一个工做队列的时候,Job
类中的静态成员变量会被初始化。同时为了保证程序的正确性,咱们须要一个方法来检测静态成员变量是否初始化。当咱们在建立一个任务的时候,若是静态成员此时未被初始化,那么日志会给出警告:
private void _checkStatic() { if (vertx == null) { logger.warn("static Vertx instance in Job class is not initialized!"); } }
咱们还注意到 Job
类也是由@DataObject
注解修饰的。Vert.x Codegen能够处理含有@DataObject
注解的类并生成对应的JSON转换器,而且Vert.x Service Proxy也须要数据对象。
在Job
类中咱们有四个构造函数。其中address_id
成员必须在一个任务被建立时就被赋值,默认状况下此地址用一个惟一的UUID字符串表示。每个构造函数中咱们都要调用_checkStatic
函数来检测静态成员变量是否被初始化。
正如咱们以前所提到的那样,咱们经过一个特定的地址vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}
在分布式的Event Bus上发送和接收任务事件(job events)。因此咱们提供了两个用于发送和接收事件的辅助函数emit
和on
(相似于Node.js中的EventEmitter
):
@Fluent public <T> Job on(String event, Handler<Message<T>> handler) { logger.debug("[LOG] On: " + Kue.getCertainJobAddress(event, this)); eventBus.consumer(Kue.getCertainJobAddress(event, this), handler); return this; } @Fluent public Job emit(String event, Object msg) { logger.debug("[LOG] Emit: " + Kue.getCertainJobAddress(event, this)); eventBus.send(Kue.getCertainJobAddress(event, this), msg); return this; }
在后面的代码中,咱们将频繁使用这两个辅助函数。
在咱们探索相关的逻辑函数以前,咱们先来描述一下Vert.x Kue的数据在Redis中是以什么样的形式存储的:
vertx_kue
命名空间下(以vertx_kue:
做为前缀)vertx:kue:job:{id}
: 存储任务实体的mapvertx:kue:ids
: 计数器,指示当前最大的任务IDvertx:kue:job:types
: 存储全部任务类型的列表vertx:kue:{type}:jobs
: 指示全部等待状态下的某种类型任务的列表vertx_kue:jobs
: 存储全部任务zid
的有序集合vertx_kue:job:{state}
: 存储全部指定状态的任务zid
的有序集合vertx_kue:jobs:{type}:{state}
: 存储全部指定状态和类型的任务zid
的有序集合vertx:kue:job:{id}:log
: 存储指定id
的任务对应日志的列表OK,下面咱们就来看看Job
类中重要的逻辑函数。
咱们以前提到过,Vert.x Kue中的任务一共有五种状态。全部的任务相关的操做都伴随着任务状态的变换,所以咱们先来看一下state
方法的实现,它用于改变任务的状态:
public Future<Job> state(JobState newState) { Future<Job> future = Future.future(); RedisClient client = RedisHelper.client(vertx, new JsonObject()); // use a new client to keep transaction JobState oldState = this.state; client.transaction().multi(r0 -> { // (1) if (r0.succeeded()) { if (oldState != null && !oldState.equals(newState)) { // (2) client.transaction().zrem(RedisHelper.getStateKey(oldState), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + oldState.name()), this.zid, _failure()); } client.transaction().hset(RedisHelper.getKey("job:" + this.id), "state", newState.name(), _failure()) // (3) .zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue(), this.zid, _failure()) .zadd(RedisHelper.getKey("jobs:" + this.type + ":" + newState.name()), this.priority.getValue(), this.zid, _failure()); switch (newState) { // dispatch different state case ACTIVE: // (4) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue() < 0 ? this.priority.getValue() : -this.priority.getValue(), this.zid, _failure()); break; case DELAYED: // (5) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure()); break; case INACTIVE: // (6) client.transaction().lpush(RedisHelper.getKey(this.type + ":jobs"), "1", _failure()); break; default: } this.state = newState; client.transaction().exec(r -> { // (7) if (r.succeeded()) { future.complete(this); } else { future.fail(r.cause()); } }); } else { future.fail(r0.cause()); } }); return future.compose(Job::updateNow); }
首先咱们先建立了一个Future
对象。而后咱们调用了 client.transaction().multi(handler)
函数开始一次Redis事务 (1)。在Vert.x 3.3.2中,全部的Redis事务操做都移至RedisTransaction
类中,因此咱们须要先调用client.transaction()
方法去获取一个事务实例,而后调用multi
表明事务块的开始。
在multi
函数传入的Handler
中,咱们先断定当前的任务状态。若是当前任务状态不为空而且不等于新的任务状态,咱们就将Redis中存储的旧的状态信息移除 (2)。为了方便起见,咱们提供了一个RedisHelper
辅助类,里面提供了一些生成特定地址以及编码解码zid
的方法:
package io.vertx.blueprint.kue.util; import io.vertx.blueprint.kue.queue.JobState; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.redis.RedisClient; import io.vertx.redis.RedisOptions; public final class RedisHelper { private static final String VERTX_KUE_REDIS_PREFIX = "vertx_kue"; private RedisHelper() { } public static RedisClient client(Vertx vertx, JsonObject config) { return RedisClient.create(vertx, options(config)); } public static RedisOptions options(JsonObject config) { return new RedisOptions() .setHost(config.getString("redis.host", "127.0.0.1")) .setPort(config.getInteger("redis.port", 6379)); } public static String getKey(String key) { return VERTX_KUE_REDIS_PREFIX + ":" + key; } public static String getStateKey(JobState state) { return VERTX_KUE_REDIS_PREFIX + ":jobs:" + state.name(); } public static String createFIFO(long id) { String idLen = "" + ("" + id).length(); int len = 2 - idLen.length(); while (len-- > 0) idLen = "0" + idLen; return idLen + "|" + id; } public static String stripFIFO(String zid) { return zid.substring(zid.indexOf('|') + 1); } public static long numStripFIFO(String zid) { return Long.parseLong(zid.substring(zid.indexOf('|') + 1)); } }
全部的key都必须在vertx_kue
命名空间下,所以咱们封装了一个getKey
方法。咱们还实现了createFIFO
和stripFIFO
方法用于生成zid
以及解码zid
。zid
的格式使用了Automattic/Kue中的格式。
回到state
方法来。咱们使用zrem(String key, String member, Handler<AsyncResult<String>> handler)
方法将特定的数据从有序集合中移除。两个key分别是vertx_kue:job:{state}
以及 vertx_kue:jobs:{type}:{state}
;member
对应着任务的zid
。
接下来咱们使用hset
方法来变动新的状态 (3),而后用zadd
方法往vertx_kue:job:{state}
和 vertx_kue:jobs:{type}:{state}
两个有序集合中添加此任务的zid
,同时传递一个权重(score)。这个很是重要,咱们就是经过这个实现优先级队列的。咱们直接使用priority
对应的值做为score
。这样,当咱们须要从Redis中获取任务的时候,咱们就能够经过zpop
方法获取优先级最高的任务。咱们会在后面详细讲述。
不一样的新状态须要不一样的操做。对于ACTIVE
状态,咱们经过zadd
命令将zid
添加至vertx_kue:jobs:ACTIVE
有序集合中并赋予优先级权值 (4)。对于DELAYED
状态,咱们经过zadd
命令将zid
添加至vertx_kue:jobs:DELAYED
有序集合中并赋予提高时间(promote_at
)权值 (5)。对于INACTIVE
状态,咱们向vertx:kue:{type}:jobs
列表中添加一个元素 (6)。这些操做都是在Redis事务块内完成的。最后咱们经过exec
方法一并执行这些事务操做 (7)。若是执行成功,咱们给future
赋值(当前任务)。最后咱们返回future
而且与updateNow
方法相组合。
updateNow
方法很是简单,就是把updated_at
的值设为当前时间,而后存到Redis中:
Future<Job> updateNow() { this.updated_at = System.currentTimeMillis(); return this.set("updated_at", String.valueOf(updated_at)); }
这里咱们来看一下整个Job
类中最重要的方法之一 - save
方法,它的做用是保存任务至Redis中。
public Future<Job> save() { // check Objects.requireNonNull(this.type, "Job type cannot be null"); // (1) if (this.id > 0) return update(); // (2) Future<Job> future = Future.future(); // 生成id client.incr(RedisHelper.getKey("ids"), res -> { // (3) if (res.succeeded()) { this.id = res.result(); this.zid = RedisHelper.createFIFO(id); // (4) String key = RedisHelper.getKey("job:" + this.id); if (this.delay > 0) { this.state = JobState.DELAYED; } client.sadd(RedisHelper.getKey("job:types"), this.type, _failure()); // (5) this.created_at = System.currentTimeMillis(); this.promote_at = this.created_at + this.delay; // 保存任务 client.hmset(key, this.toJson(), _completer(future, this)); // (6) } else { future.fail(res.cause()); } }); return future.compose(Job::update); // (7) }
首先,任务类型不能为空因此咱们要检查type
是否为空 (1)。接着,若是当前任务的id大于0,则表明此任务已经存储过(由于id是存储时分配),此时只需执行更新操做(update
)便可 (2)。而后咱们建立一个Future
对象,而后使用incr
方法从vertx_kue:ids
字段获取一个新的id
(3)。同时咱们使用RedisHelper.createFIFO(id)
方法来生成新的zid
(4)。接着咱们来判断任务延时是否大于0,若大于0则将当前任务状态设置为DELAYED
。而后咱们经过sadd
方法将当前任务类型添加至vertx:kue:job:types
列表中 (5) 而且保存任务建立时间(created_at
)以及任务提高时间(promote_at
)。通过这一系列的操做后,全部的属性都已准备好,因此咱们能够利用hmset
方法将此任务实体存储至vertx:kue:job:{id}
哈希表中 (6)。若是存储操做成功,那么将当前任务实体赋给future
,不然记录错误。最后咱们返回此future
而且将其与update
方法进行组合。
update
方法进行一些更新操做,它的逻辑比较简单:
Future<Job> update() { Future<Job> future = Future.future(); this.updated_at = System.currentTimeMillis(); client.transaction().multi(_failure()) .hset(RedisHelper.getKey("job:" + this.id), "updated_at", String.valueOf(this.updated_at), _failure()) .zadd(RedisHelper.getKey("jobs"), this.priority.getValue(), this.zid, _failure()) .exec(_completer(future, this)); return future.compose(r -> this.state(this.state)); }
能够看到update
方法只作了三件微小的工做:存储任务更新时间、存储zid
以及更改当前任务状态(组合state
方法)。
最后总结一下将一个任务存储到Redis中通过的步骤:save -> update -> state
:-)
移除任务很是简单,借助zrem
和del
方法便可。咱们来看一下其实现:
public Future<Void> remove() { Future<Void> future = Future.future(); client.transaction().multi(_failure()) .zrem(RedisHelper.getKey("jobs:" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs"), this.zid, _failure()) .del(RedisHelper.getKey("job:" + this.id + ":log"), _failure()) .del(RedisHelper.getKey("job:" + this.id), _failure()) .exec(r -> { if (r.succeeded()) { this.emit("remove", new JsonObject().put("id", this.id)); future.complete(); } else { future.fail(r.cause()); } }); return future; }
注意到成功移除任务时,咱们会向Event Bus上的特定地址发送remove
任务事件。此事件包含着被移除任务的id
。
咱们能够经过几种 onXXX
方法来监放任务事件:
@Fluent public Job onComplete(Handler<Job> completeHandler) { this.on("complete", message -> { completeHandler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onFailure(Handler<JsonObject> failureHandler) { this.on("failed", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onFailureAttempt(Handler<JsonObject> failureHandler) { this.on("failed_attempt", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onPromotion(Handler<Job> handler) { this.on("promotion", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onStart(Handler<Job> handler) { this.on("start", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onRemove(Handler<JsonObject> removeHandler) { this.on("start", message -> { removeHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onProgress(Handler<Integer> progressHandler) { this.on("progress", message -> { progressHandler.handle((Integer) message.body()); }); return this; }
注意到不一样的事件,对应接收的数据类型也有差别。咱们来讲明一下:
onComplete
、onPromotion
以及 onStart
: 发送的数据是对应的Job
对象onFailure
and onFailureAttempt
: 发送的数据是JsonObject
类型的,其格式相似于:{ "job": {}, "extra": { "message": "some_error" } }
onProgress
: 发送的数据是当前任务进度onRemove
: 发送的数据是JsonObject
类型的,其中id
表明被移除任务的编号咱们能够经过progress
方法来更新任务进度。看一下其实现:
public Future<Job> progress(int complete, int total) { int n = Math.min(100, complete * 100 / total); // (1) this.emit("progress", n); // (2) return this.setProgress(n) // (3) .set("progress", String.valueOf(n)) .compose(Job::updateNow); }
progress
方法接受两个参数:第一个是当前完成的进度值,第二个是完成状态须要的进度值。咱们首先计算出当前的进度 (1),而后向特定地址发送progress
事件 (2)。最后咱们将进度存储至Redis中并更新时间,返回Future
(3)。
当一个任务处理失败时,若是它有剩余的重试次数,Vert.x Kue会自动调用failAttempt
方法进行重试。咱们来看一下failAttempt
方法的实现:
Future<Job> failedAttempt(Throwable err) { return this.error(err) .compose(Job::failed) .compose(Job::attemptInternal); }
(⊙o⊙)很是简短吧~实际上,failAttempt
方法是三个异步方法的组合:error
、failed
以及attemptInternal
。当一个任务须要进行重试的时候,咱们首先向Event Bus发布 error
队列事件而且在Redis中记录日志,而后将当前的任务状态置为FAILED
,最后从新处理此任务。
咱们先来看一下error
方法:
public Future<Job> error(Throwable ex) { return this.emitError(ex) .set("error", ex.getMessage()) .compose(j -> j.log("error | " + ex.getMessage())); }
它的逻辑很简单:首先咱们向Event Bus发布 错误 事件,而后记录错误日志便可。这里咱们封装了一个发布错误的函数emitError
:
@Fluent public Job emitError(Throwable ex) { JsonObject errorMessage = new JsonObject().put("id", this.id) .put("message", ex.getMessage()); eventBus.publish(Kue.workerAddress("error"), errorMessage); eventBus.send(Kue.getCertainJobAddress("error", this), errorMessage); return this; }
其中发送的错误信息格式相似于下面的样子:
{ "id": 2052, "message": "some error" }
接下来咱们再来看一下failed
方法的实现:
public Future<Job> failed() { this.failed_at = System.currentTimeMillis(); return this.updateNow() .compose(j -> j.set("failed_at", String.valueOf(j.failed_at))) .compose(j -> j.state(JobState.FAILED)); }
很是简单,首先咱们更新任务的更新时间和失败时间,而后经过state
方法将当前任务状态置为FAILED
便可。
任务重试的核心逻辑在attemptInternal
方法中:
private Future<Job> attemptInternal() { int remaining = this.max_attempts - this.attempts; // (1) if (remaining > 0) { // 还有重试次数 return this.attemptAdd() // (2) .compose(Job::reattempt) // (3) .setHandler(r -> { if (r.failed()) { this.emitError(r.cause()); // (4) } }); } else if (remaining == 0) { // (5) return Future.failedFuture("No more attempts"); } else { // (6) return Future.failedFuture(new IllegalStateException("Attempts Exceeded")); } }
在咱们的Job
数据对象中,咱们存储了最大重试次数max_attempts
以及已经重试的次数attempts
,因此咱们首先根据这两个数据计算剩余的重试次数remaining
(1)。若是还有剩余次数的话,咱们就先调用attemptAdd
方法增长一次已重试次数并 (2),而后咱们调用reattempt
方法执行真正的任务重试逻辑 (3)。最后返回这两个异步方法组合的Future
。若是其中一个过程出现错误,咱们就发布error
事件 (4)。若是没有剩余次数了或者超出剩余次数了,咱们直接返回错误。
在咱们解析reattempt
方法以前,咱们先来回顾一下Vert.x Kue中的任务失败恢复机制。Vert.x Kue支持延时重试机制(retry backoff),而且支持不一样的策略(如 fixed 以及 exponential)。以前咱们提到Job
类中有一个backoff
成员变量,它用于配置延时重试的策略。它的格式相似于这样:
{ "type": "fixed", "delay": 5000 }
延时重试机制的实如今getBackoffImpl
方法中,它返回一个Function<Integer, Long>
对象,表明一个接受Integer
类型(即attempts
),返回Long
类型(表明计算出的延时值)的函数:
private Function<Integer, Long> getBackoffImpl() { String type = this.backoff.getString("type", "fixed"); // (1) long _delay = this.backoff.getLong("delay", this.delay); // (2) switch (type) { case "exponential": // (3) return attempts -> Math.round(_delay * 0.5 * (Math.pow(2, attempts) - 1)); case "fixed": default: // (4) return attempts -> _delay; } }
首先咱们从backoff
配置中获取延迟重试策略。目前Vert.x Kue支持两种策略:fixed
和 exponential
。前者采用固定延迟时间,然后者采用指数增加型延迟时间。默认状况下Vert.x Kue会采用fixed
策略 (1)。接下来咱们从backoff
配置中获取延迟时间,若是配置中没有指定,那么就使用任务对象中的延迟时间delay
(2)。接下来就是根据具体的策略进行计算了。对于指数型延迟,咱们计算[delay * 0.5 * 2^attempts]
做为延迟时间 (3);对于固定型延迟策略,咱们直接使用获取到的延迟时间 (4)。
好啦,如今回到“真正的重试”方法 —— reattempt
方法来:
private Future<Job> reattempt() { if (this.backoff != null) { long delay = this.getBackoffImpl().apply(attempts); // (1) return this.setDelay(delay) .setPromote_at(System.currentTimeMillis() + delay) .update() // (2) .compose(Job::delayed); // (3) } else { return this.inactive(); // (4) } }
首先咱们先检查backoff
配置是否存在,若存在则计算出对应的延时时间 (1) 而且设定delay
和promote_at
属性的值而后保存至Redis中 (2)。接着咱们经过delayed
方法将任务的状态设为延时(DELAYED
) (3)。若是延时重试配置不存在,咱们就经过inactive
方法直接将此任务置入工做队列中 (4)。
这就是整个任务重试功能的实现,也不是很复杂蛤?观察上面的代码,咱们能够发现Future
组合无处不在。这种响应式的组合很是方便。想想若是咱们用回调的异步方式来写代码的话,咱们很容易陷入回调地狱中(⊙o⊙)。。。几个回调嵌套起来总显得不是那么优美和简洁,而用响应式的、可组合的Future
就能够有效地避免这个问题。
不错!到如今为止咱们已经探索完Job
类的源码了~下面咱们来看一下JobService
类。
在本章节中咱们来探索一下JobService
接口及其实现 —— 它包含着各类普通的操做和统计Job
的逻辑。
咱们的JobService
是一个通用逻辑接口,所以咱们但愿应用中的每个组件都能访问此服务,即进行RPC。在Vert.x中,咱们能够将服务注册至Event Bus上,而后其它组件就能够经过Event Bus来远程调用注册的服务了。
传统的RPC有一个缺点:消费者须要阻塞等待生产者的回应。你可能想说:这是一种阻塞模型,和Vert.x推崇的异步开发模式不相符。没错!并且,传统的RPC不是真正面向失败设计的。
还好,Vert.x提供了一种高效的、响应式的RPC —— 异步RPC。咱们不须要等待生产者的回应,而只须要传递一个Handler<AsyncResult<R>>
参数给异步方法。这样当收到生产者结果时,对应的Handler
就会被调用,很是方便,这与Vert.x的异步开发模式相符。而且,AsyncResult
也是面向失败设计的。
因此讲到这里,你可能想问:到底怎么在Event Bus上注册服务呢?咱们是否是须要写一大堆的逻辑去包装和发送信息,而后在另外一端解码信息并进行调用呢?不,这太麻烦了!有了Vert.x 服务代理,咱们不须要这么作!Vert.x提供了一个组件 Vert.x Service Proxy 来自动生成服务代理。有了它的帮助,咱们就只须要按照规范设计咱们的异步服务接口,而后用@ProxyGen
注解修饰便可。
@ProxyGen
注解的限制@ProxyGen
注解的使用有诸多限制。好比,全部的异步方法都必须是基于回调的,也就是说每一个方法都要接受一个Handler<AsyncResult<R>>
类型的参数。而且,类型R
也是有限制的 —— 只容许基本类型以及数据对象类型。详情请参考官方文档。
咱们来看一下JobService
的源码:
@ProxyGen @VertxGen public interface JobService { static JobService create(Vertx vertx, JsonObject config) { return new JobServiceImpl(vertx, config); } static JobService createProxy(Vertx vertx, String address) { return ProxyHelper.createProxy(JobService.class, vertx, address); } /** * 获取任务,按照优先级顺序 * * @param id job id * @param handler async result handler */ @Fluent JobService getJob(long id, Handler<AsyncResult<Job>> handler); /** * 删除任务 * * @param id job id * @param handler async result handler */ @Fluent JobService removeJob(long id, Handler<AsyncResult<Void>> handler); /** * 判断任务是否存在 * * @param id job id * @param handler async result handler */ @Fluent JobService existsJob(long id, Handler<AsyncResult<Boolean>> handler); /** * 获取任务日志 * * @param id job id * @param handler async result handler */ @Fluent JobService getJobLog(long id, Handler<AsyncResult<JsonArray>> handler); /** * 获取某一范围内某个指定状态下的任务列表 * * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByState(String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); /** * 获取某一范围内某个指定状态和类型下的任务列表 * * @param type expected job type * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByType(String type, String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); /** * 获取某一范围内的任务列表(按照顺序或倒序) * * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRange(long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); // 统计函数 /** * 获取指定状态和类型下的任务的数量 * * @param type job type * @param state job state * @param handler async result handler */ @Fluent JobService cardByType(String type, JobState state, Handler<AsyncResult<Long>> handler); /** * 获取某个状态下的任务的数量 * * @param state job state * @param handler async result handler */ @Fluent JobService card(JobState state, Handler<AsyncResult<Long>> handler); /** * 获取COMPLETE状态任务的数量 * * @param type job type; if null, then return global metrics * @param handler async result handler */ @Fluent JobService completeCount(String type, Handler<AsyncResult<Long>> handler); /** * 获取FAILED状态任务的数量 * * @param type job type; if null, then return global metrics */ @Fluent JobService failedCount(String type, Handler<AsyncResult<Long>> handler); /** * 获取INACTIVE状态任务的数量 * * @param type job type; if null, then return global metrics */ @Fluent JobService inactiveCount(String type, Handler<AsyncResult<Long>> handler); /** * 获取ACTIVE状态任务的数量 * * @param type job type; if null, then return global metrics */ @Fluent JobService activeCount(String type, Handler<AsyncResult<Long>> handler); /** * 获取DELAYED状态任务的数量 * * @param type job type; if null, then return global metrics */ @Fluent JobService delayedCount(String type, Handler<AsyncResult<Long>> handler); /** * 获取当前存在的全部任务类型 * * @param handler async result handler */ @Fluent JobService getAllTypes(Handler<AsyncResult<List<String>>> handler); /** * 获取指定状态下的全部任务的ID * * @param state job state * @param handler async result handler */ @Fluent JobService getIdsByState(JobState state, Handler<AsyncResult<List<Long>>> handler); /** * 工做队列运行时间(ms) * * @param handler async result handler */ @Fluent JobService getWorkTime(Handler<AsyncResult<Long>> handler); }
能够看到咱们还为JobService
接口添加了@VertxGen
注解,Vert.x Codegen能够处理此注解生成多种语言版本的服务。
在JobService
接口中咱们还定义了两个静态方法:create
用于建立一个任务服务实例,createProxy
用于建立一个服务代理。
JobService
接口中包含一些任务操做和统计的相关逻辑,每一个方法的功能都已经在注释中阐述了,所以咱们就直接来看它的实现吧~
JobService
接口的实现位于JobServiceImpl
类中,代码很是长,所以这里就不贴代码了。。。你们能够对照GitHub中的代码读下面的内容。
getJob
: 获取任务的方法很是简单。直接利用hgetall
命令从Redis中取出对应的任务便可。removeJob
: 咱们能够将此方法看做是getJob
和Job#remove
两个方法的组合。existsJob
: 使用exists
命令判断对应id
的任务是否存在。getJobLog
: 使用lrange
命令从vertx_kue:job:{id}:log
列表中取出日志。rangeGeneral
: 使用zrange
命令获取必定范围内的任务,这是一个通用方法。
zrange
操做zrange
返回某一有序集合中某个特定范围内的元素。详情请见ZRANGE - Redis。
如下三个方法复用了rangeGeneral
方法:
jobRangeByState
: 指定状态,对应的key为vertx_kue:jobs:{state}
。jobRangeByType
: 指定状态和类型,对应的key为vertx_kue:jobs:{type}:{state}
。jobRange
: 对应的key为vertx_kue:jobs
。这两个通用方法用于任务数量的统计:
cardByType
: 利用zcard
命令获取某一指定状态和类型下任务的数量。card
: 利用zcard
命令获取某一指定状态下任务的数量。下面五个辅助统计方法复用了上面两个通用方法:
completeCount
failedCount
delayedCount
inactiveCount
activeCount
接着看:
getAllTypes
: 利用smembers
命令获取vertx_kue:job:types
集合中存储的全部的任务类型。getIdsByState
: 使用zrange
获取某一指定状态下全部任务的ID。getWorkTime
: 使用get
命令从vertx_kue:stats:work-time
中获取Vert.x Kue的工做时间。既然完成了JobService
的实现,接下来咱们来看一下如何利用Service Proxy将服务注册至Event Bus上。这里咱们还须要一个KueVerticle
来建立要注册的服务实例,而且将其注册至Event Bus上。
打开io.vertx.blueprint.kue.queue.KueVerticle
类的源码:
package io.vertx.blueprint.kue.queue; import io.vertx.blueprint.kue.service.JobService; import io.vertx.blueprint.kue.util.RedisHelper; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.redis.RedisClient; import io.vertx.serviceproxy.ProxyHelper; public class KueVerticle extends AbstractVerticle { private static Logger logger = LoggerFactory.getLogger(Job.class); public static final String EB_JOB_SERVICE_ADDRESS = "vertx.kue.service.job.internal"; // (1) private JsonObject config; private JobService jobService; @Override public void start(Future<Void> future) throws Exception { this.config = config(); this.jobService = JobService.create(vertx, config); // (2) // create redis client RedisClient redisClient = RedisHelper.client(vertx, config); redisClient.ping(pr -> { // (3) test connection if (pr.succeeded()) { logger.info("Kue Verticle is running..."); // (4) register job service ProxyHelper.registerService(JobService.class, vertx, jobService, EB_JOB_SERVICE_ADDRESS); future.complete(); } else { logger.error("oops!", pr.cause()); future.fail(pr.cause()); } }); } }
首先咱们须要定义一个地址用于服务注册 (1)。在start
方法中,咱们建立了一个任务服务实例 (2),而后经过ping
命令测试Redis链接 (3)。若是链接正常,那么咱们就能够经过ProxyHelper
类中的registerService
辅助方法来将服务实例注册至Event Bus上 (4)。
这样,一旦咱们在集群模式下部署KueVerticle
,服务就会被发布至Event Bus上,而后咱们就能够在其余组件中去远程调用此服务了。很奇妙吧!
Kue
类表明着工做队列。咱们来看一下Kue
类的实现。首先先看一下其构造函数:
public Kue(Vertx vertx, JsonObject config) { this.vertx = vertx; this.config = config; this.jobService = JobService.createProxy(vertx, EB_JOB_SERVICE_ADDRESS); this.client = RedisHelper.client(vertx, config); Job.setVertx(vertx, RedisHelper.client(vertx, config)); // init static vertx instance inner job }
这里咱们须要注意两点:第一点,咱们经过createProxy
方法来建立一个JobService
的服务代理;第二点,以前提到过,咱们须要在这里初始化Job
类中的静态成员变量。
咱们的JobService
是基于回调的,这是服务代理组件所要求的。为了让Vert.x Kue更加响应式,使用起来更加方便,咱们在Kue
类中以基于Future的异步模式封装了JobService
中的全部异步方法。这很简单,好比这个方法:
@Fluent JobService getJob(long id, Handler<AsyncResult<Job>> handler);
能够这么封装:
public Future<Optional<Job>> getJob(long id) { Future<Optional<Job>> future = Future.future(); jobService.getJob(id, r -> { if (r.succeeded()) { future.complete(Optional.ofNullable(r.result())); } else { future.fail(r.cause()); } }); return future; }
其实就是加一层Future
。其它的封装过程也相似因此咱们就不细说了。
process
和processBlocking
方法用于处理任务:
public Kue process(String type, int n, Handler<Job> handler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, false); }f setupTimers(); return this; } public Kue process(String type, Handler<Job> handler) { processInternal(type, handler, false); setupTimers(); return this; } public Kue processBlocking(String type, int n, Handler<Job> handler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, true); } setupTimers(); return this; }
两个process
方法都相似 —— 它们都是使用Event Loop线程处理任务的,其中第一个方法还能够指定同时处理任务数量的阈值。咱们来回顾一下使用Event Loop线程的注意事项 —— 咱们不能阻塞Event Loop线程。所以若是咱们须要在处理任务时作一些耗时的操做,咱们可使用processBlocking
方法。这几个方法的代码看起来都差很少,那么区别在哪呢?以前咱们提到过,咱们设计了一种Verticle - KueWorker
,用于处理任务。所以对于process
方法来讲,KueWorker
就是一种普通的Verticle;而对于processBlocking
方法来讲,KueWorker
是一种Worker Verticle。这两种Verticle有什么不一样呢?区别在于,Worker Verticle会使用Worker线程,所以即便咱们执行一些耗时的操做,Event Loop线程也不会被阻塞。
建立及部署KueWorker
的逻辑在processInternal
方法中,这三个方法都使用了processInternal
方法:
private void processInternal(String type, Handler<Job> handler, boolean isWorker) { KueWorker worker = new KueWorker(type, handler, this); // (1) vertx.deployVerticle(worker, new DeploymentOptions().setWorker(isWorker), r0 -> { // (2) if (r0.succeeded()) { this.on("job_complete", msg -> { long dur = new Job(((JsonObject) msg.body()).getJsonObject("job")).getDuration(); client.incrby(RedisHelper.getKey("stats:work-time"), dur, r1 -> { // (3) if (r1.failed()) r1.cause().printStackTrace(); }); }); } }); }
首先咱们建立一个KueWorker
实例 (1)。咱们将在稍后详细介绍KueWorker
的实现。而后咱们根据提供的配置来部署此KueWorker
(2)。processInternal
方法的第三个参数表明此KueWorker
是否为worker verticle。若是部署成功,咱们就监听complete
事件。每当接收到complete
事件的时候,咱们获取收到的信息(处理任务消耗的时间),而后用incrby
增长对应的工做时间 (3)。
再回到前面三个处理方法中。除了部署KueWorker
之外,咱们还调用了setupTimers
方法,用于设定定时器以监测延时任务以及监测活动任务TTL。
Vert.x Kue支持延时任务,所以咱们须要在任务延时时间到达时将任务“提高”至工做队列中等待处理。这个工做是在checkJobPromotion
方法中实现的:
private void checkJobPromotion() { int timeout = config.getInteger("job.promotion.interval", 1000); // (1) int limit = config.getInteger("job.promotion.limit", 1000); // (2) vertx.setPeriodic(timeout, l -> { // (3) client.zrangebyscore(RedisHelper.getKey("jobs:DELAYED"), String.valueOf(0), String.valueOf(System.currentTimeMillis()), new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)), r -> { // (4) if (r.succeeded()) { r.result().forEach(r1 -> { long id = Long.parseLong(RedisHelper.stripFIFO((String) r1)); this.getJob(id).compose(jr -> jr.get().inactive()) // (5) .setHandler(jr -> { if (jr.succeeded()) { jr.result().emit("promotion", jr.result().getId()); // (6) } else { jr.cause().printStackTrace(); } }); }); } else { r.cause().printStackTrace(); } }); }); }
首先咱们从配置中获取监测延时任务的间隔(job.promotion.interval
,默认1000ms)以及提高数量阈值(job.promotion.limit
,默认1000)。而后咱们使用vertx.setPeriodic
方法设一个周期性的定时器 (3),每隔一段时间就从Redis中获取须要被提高的任务 (4)。这里咱们经过zrangebyscore
获取每一个须要被提高任务的id
。咱们来看一下zrangebyscore
方法的定义:
RedisClient zrangebyscore(String key, String min, String max, RangeLimitOptions options, Handler<AsyncResult<JsonArray>> handler);
key
: 某个有序集合的key,即vertx_kue:jobs:DELAYED
min
and max
: 最小值以及最大值(按照某种模式)。这里min
是0,而max
是当前时间戳咱们来回顾一下Job
类中的state
方法。当咱们要把任务状态设为DELAYED
的时候,咱们将score设为promote_at
时间:
case DELAYED: client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure());
所以咱们将max
设为当前时间(System.currentTimeMillis()
),只要当前时间超过须要提高的时间,这就说明此任务能够被提高了。
options
: range和limit配置。这里咱们须要指定LIMIT
值因此咱们用new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)
建立了一个配置zrangebyscore
的结果是一个JsonArray
,里面包含着全部等待提高任务的zid
。得到结果后咱们就将每一个zid
转换为id
,而后分别获取对应的任务实体,最后对每一个任务调用inactive
方法来将任务状态设为INACTIVE
(5)。若是任务成功提高至工做队列,咱们就发送promotion
事件 (6)。
咱们知道,Vert.x支持多种语言(如JS,Ruby),所以若是能让咱们的Vert.x Kue支持多种语言那固然是极好的!这没有问题~Vert.x Codegen能够处理含@VertxGen
注解的异步接口,生成多语言版本。@VertxGen
注解一样限制异步方法 —— 须要基于回调,所以咱们设计了一个CallbackKue
接口用于提供多语言支持。CallbackKue
的设计很是简单,其实现复用了Kue
和jobService
的代码。你们能够直接看源码,一目了然,这里就不细说了。
注意要生成多语言版本的代码,须要添加相应的依赖。好比要生成Ruby版本的代码就要向build.gradle
中添加compile("io.vertx:vertx-lang-ruby:${vertxVersion}")
。
好啦,咱们已经对Vert.x Kue Core的几个核心部分有了大体的了解了,如今是时候探索一下任务处理的本源 - KueWorker
了~
每个worker都对应一个特定的任务类型,而且绑定着特定的处理函数(Handler
),因此咱们须要在建立的时候指定它们。
在KueWorker
中,咱们使用prepareAndStart
方法来准备要处理的任务而且开始处理任务的过程:
private void prepareAndStart() { this.getJobFromBackend().setHandler(jr -> { // (1) if (jr.succeeded()) { if (jr.result().isPresent()) { this.job = jr.result().get(); // (2) process(); // (3) } else { this.emitJobEvent("error", null, new JsonObject().put("message", "job_not_exist")); throw new IllegalStateException("job not exist"); } } else { this.emitJobEvent("error", null, new JsonObject().put("message", jr.cause().getMessage())); jr.cause().printStackTrace(); } }); }
代码比较直观。首先咱们经过getJobFromBackend
方法从Redis中按照优先级顺序获取任务 (1)。若是成功获取任务,咱们就把获取到的任务保存起来 (2) 而后经过process
方法处理任务 (3)。若是中间出现错误,咱们须要发送error
错误事件,其中携带错误信息。
咱们来看一下咱们是如何从Redis中按照优先级顺序获取任务实体的:
private Future<Optional<Job>> getJobFromBackend() { Future<Optional<Job>> future = Future.future(); client.blpop(RedisHelper.getKey(this.type + ":jobs"), 0, r1 -> { // (1) if (r1.failed()) { client.lpush(RedisHelper.getKey(this.type + ":jobs"), "1", r2 -> { if (r2.failed()) future.fail(r2.cause()); }); } else { this.zpop(RedisHelper.getKey("jobs:" + this.type + ":INACTIVE")) // (2) .compose(kue::getJob) // (3) .setHandler(r -> { if (r.succeeded()) { future.complete(r.result()); } else future.fail(r.cause()); }); } }); return future; }
以前咱们已经了解到,每当咱们保存一个任务的时候,咱们都会向vertx_kue:{type}:jobs
列表中插入一个新元素表示新的任务可供处理。所以这里咱们经过blpop
命令来等待可用的任务 (1)。一旦有任务可供处理,咱们就利用zpop
方法取出高优先级的任务的zid
(2)。zpop
命令是一个原子操做,用于从有序集合中弹出最小score值的元素。注意Redis没有实现zpop
命令,所以咱们须要本身实现。
Redis官方文档介绍了一种实现zpop
命令的简单方法 - 利用 WATCH
。这里咱们利用另一种思路实现zpop
命令:
private Future<Long> zpop(String key) { Future<Long> future = Future.future(); client.transaction() .multi(_failure()) .zrange(key, 0, 0, _failure()) .zremrangebyrank(key, 0, 0, _failure()) .exec(r -> { if (r.succeeded()) { JsonArray res = r.result(); if (res.getJsonArray(0).size() == 0) // empty set future.fail(new IllegalStateException("Empty zpop set")); else { try { future.complete(Long.parseLong(RedisHelper.stripFIFO( res.getJsonArray(0).getString(0)))); } catch (Exception ex) { future.fail(ex); } } } else { future.fail(r.cause()); } }); return future; }
在咱们的zpop
的实现中,咱们首先开始了一个事务块,而后依次执行zrange
和zremrangebyrank
命令。有关这些命令的详情咱们就不细说了,能够参考Redis官方文档。而后咱们提交事务,若是提交成功,咱们会得到一个JsonArray
类型的结果。正常状况下咱们均可以经过res.getJsonArray(0).getString(0)
获取到对应的zid
值。获取到zid
值之后咱们就能够将其转换为任务的id
了,最后咱们将id
置于Future
内(由于zpop
也是一个异步方法)。
接着回到getJobFromBackend
方法中。获取到对应的id
以后,咱们就能够经过Kue
的getJob
函数获取任务实体了 (3)。因为getJobFromBackend
也是一个异步方法,所以咱们一样将结果置于Future
中。
前边讲了那么多,都是在为处理任务作准备。。。不要着急,如今终于到了真正的“处理”逻辑咯!咱们看一下process
方法的实现:
private void process() { long curTime = System.currentTimeMillis(); this.job.setStarted_at(curTime) .set("started_at", String.valueOf(curTime)) // (1) set start time .compose(Job::active) // (2) set the job state to ACTIVE .setHandler(r -> { if (r.succeeded()) { Job j = r.result(); // emit start event this.emitJobEvent("start", j, null); // (3) emit job `start` event // (4) process logic invocation try { jobHandler.handle(j); } catch (Exception ex) { j.done(ex); } // (5) consume the job done event eventBus.consumer(Kue.workerAddress("done", j), msg -> { createDoneCallback(j).handle(Future.succeededFuture( ((JsonObject) msg.body()).getJsonObject("result"))); }); eventBus.consumer(Kue.workerAddress("done_fail", j), msg -> { createDoneCallback(j).handle(Future.failedFuture( (String) msg.body())); }); } else { this.emitJobEvent("error", this.job, new JsonObject().put("message", r.cause().getMessage())); r.cause().printStackTrace(); } }); }
到了最核心的函数了!首先咱们先给开始时间赋值 (1) 而后将任务状态置为ACTIVE
(2)。若是这两个操做成功的话,咱们就向Event Bus发送任务开始(start
)事件 (3)。接下来咱们调用真正的处理逻辑 - 以前绑定的jobHandler
(4)。若是处理过程当中抛出异常的话,Vert.x Kue就会调用job.done(ex)
方法发送done_fail
内部事件来通知worker任务处理失败。可是彷佛没有看到在哪里接收并处理done
和done_fail
事件呢?就在这 (5)!一旦Vert.x Kue接收到这两个事件,它就会调用对应的handler
去进行任务完成或失败的相应操做。这里的handler
是由createDoneCallback
方法生成的:
private Handler<AsyncResult<JsonObject>> createDoneCallback(Job job) { return r0 -> { if (job == null) { return; } if (r0.failed()) { this.fail(r0.cause()); // (1) return; } long dur = System.currentTimeMillis() - job.getStarted_at(); job.setDuration(dur) .set("duration", String.valueOf(dur)); // (2) JsonObject result = r0.result(); if (result != null) { job.setResult(result) .set("result", result.encodePrettily()); // (3) } job.complete().setHandler(r -> { // (4) if (r.succeeded()) { Job j = r.result(); if (j.isRemoveOnComplete()) { // (5) j.remove(); } this.emitJobEvent("complete", j, null); // (6) this.prepareAndStart(); // (7) 准备处理下一个任务 } }); }; }
任务处理有两种状况:完成和失败,所以咱们先来看任务成功处理的状况。咱们首先给任务的用时(duration
)赋值 (2),而且若是任务产生告终果,也给结果(result
)赋值 (3)。而后咱们调用job.complete
方法将状态设置为COMPLETE
(4)。若是成功的话,咱们就检查removeOnComplete
标志位 (5) 并决定是否将任务从Redis中移除。而后咱们向Event Bus发送任务完成事件(complete
)以及队列事件job_complete
(6)。如今这个任务的处理过程已经结束了,worker须要准备处理下一个任务了,所以最后咱们调用prepareAndStart
方法准备处理下一个Job
。
人生不如意事十之八九,任务处理过程当中极可能会碰见各类各样的问题而失败。当任务处理失败时,咱们调用KueWorker
中的fail
方法:
private void fail(Throwable ex) { job.failedAttempt(ex).setHandler(r -> { // (1) if (r.failed()) { this.error(r.cause(), job); // (2) } else { Job res = r.result(); if (res.hasAttempts()) { // (3) this.emitJobEvent("failed_attempt", job, new JsonObject().put("message", ex.getMessage())); } else { this.emitJobEvent("failed", job, new JsonObject().put("message", ex.getMessage())); // (4) } prepareAndStart(); // (5) } }); }
面对失败时,咱们首先经过failedAttempt
方法尝试从错误中恢复 (1)。若是恢复失败(好比没有重试次数了)就向Event Bus发送error
队列事件 (2)。若是恢复成功,咱们就根据是否还有剩余重试次数来发送对应的事件(failed
或者failed_attempt
)。搞定错误之后,worker一样须要准备处理下一个任务了,所以最后咱们调用prepareAndStart
方法准备处理下一个Job
(5)。
这就是KueWorker
的所有实现,是否是颇有趣呢?看了这么久的代码也有些累了,下面是时候来写个Kue应用跑一下咯~
在io.vertx.blueprint.kue.example
包下(kue-example
子工程)建立一个LearningVertxVerticle
类,而后编写以下代码:
package io.vertx.blueprint.kue.example; import io.vertx.blueprint.kue.Kue; import io.vertx.blueprint.kue.queue.Job; import io.vertx.blueprint.kue.queue.Priority; import io.vertx.core.AbstractVerticle; import io.vertx.core.json.JsonObject; public class LearningVertxVerticle extends AbstractVerticle { @Override public void start() throws Exception { // 建立工做队列 Kue kue = Kue.createQueue(vertx, config()); // 监听全局错误事件 kue.on("error", message -> System.out.println("[Global Error] " + message.body())); JsonObject data = new JsonObject() .put("title", "Learning Vert.x") .put("content", "core"); // 准备学习Vert.x,爽! Job j = kue.createJob("learn vertx", data) .priority(Priority.HIGH) .onComplete(r -> { // 完成任务事件监听 System.out.println("Feeling: " + r.getResult().getString("feeling", "none")); }).onFailure(r -> { // 任务失败事件监听 System.out.println("eee...so difficult..."); }).onProgress(r -> { // 任务进度变动事件监听 System.out.println("I love this! My progress => " + r); }); // 保存任务 j.save().setHandler(r0 -> { if (r0.succeeded()) { // 开始学习! kue.processBlocking("learn vertx", 1, job -> { job.progress(10, 100); // 3秒速成 vertx.setTimer(3000, r1 -> { job.setResult(new JsonObject().put("feeling", "amazing and wonderful!")) // 结果 .done(); // 完成啦! }); }); } else { System.err.println("Wow, something happened: " + r0.cause().getMessage()); } }); } }
一般状况下,一个Vert.x Kue应用能够分为几部分:建立工做队列、建立任务、保存任务以及处理任务。咱们推荐开发者把应用写成Verticle
的形式。
在这个例子中,咱们要模拟一个学习Vert.x的任务!首先咱们经过Kue.createQueue
方法建立一个工做队列而且经过on(error, handler)
方法监听全局错误(error
)事件。接着咱们经过kue.createJob
方法建立学习任务,将优先级设定为HIGH
,而且监听complete
、failed
以及progress
事件。而后咱们须要保存任务,保存完毕之后咱们就能够经过processBlocking
方法来执行耗时任务了。在处理逻辑中,咱们首先经过job.progress
方法将进度设为10
,而后使用vertx.setTimer
方法设一个3秒的定时器,定时器时间到之后赋予结果并完成任务。
像往常同样,咱们还须要在build.gradle
中配置一下。咱们须要将kue-example
子工程中的Main-Verticle
属性设为刚才写的io.vertx.blueprint.kue.example.LearningVertxVerticle
:
project("kue-example") { dependencies { compile(project(":kue-core")) } jar { archiveName = 'vertx-blueprint-kue-example.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle' } } }
好了,到了展现时间了!打开终端,构建项目:
gradle build
固然不要忘记运行Redis:
redis-server
而后咱们先运行Vert.x Kue Core部分:
java -jar kue-core/build/libs/vertx-blueprint-kue-core.jar -cluster -ha -conf config/config.json
而后再运行咱们的实例:
java -jar kue-example/build/libs/vertx-blueprint-kue-example.jar -cluster -ha -conf config/config.json
这时终端应该会依次显示输出:
INFO: Kue Verticle is running... I love this! My progress => 10 Feeling: amazing and wonderful!
固然你也能够在Vert.x Kue的Web端查看任务状况。
棒极了!咱们终于结束了咱们的Vert.x Kue Core探索之旅~~!从这篇超长的教程中,你学到了如何利用Vert.x去开发一个基于消息的应用!太酷了!
若是想了解kue-http
的实现,请移步Vert.x 蓝图 | Vert.x Kue 教程(Web部分)。若是想了解更多的关于Vert.x Kue的特性,请移步Vert.x Kue 特性介绍。
Vert.x能作的不只仅是这些。想要了解更多的关于Vert.x的知识,请参考Vert.x 官方文档 —— 这永远是资料最齐全的地方。
My Blog: 「千载弦歌,芳华如梦」 - sczyh30's blog