Aloha 是一个基于 Scala 实现的分布式的任务调度和管理框架,提供插件式扩展功能,能够用来调度各类类型的任务。Aloha 的典型的应用场景是做为统一的任务管理入口。例如,在数据平台上一般会运行各类类型的应用,如 Spark 任务,Flink 任务,ETL 任务等,统一对这些任务进行管理并及时感知任务状态的变化是颇有必要的。git
Aloha 的基本实现是基于 Spark 的任务调度模块,在 Master 和 Worker 组件的基础上进行了修改,并提供了扩展接口,能够方便地集成各类类型的任务。Master 支持高可用配置及状态恢复,并提供了 REST 接口用于提交任务。github
在 Aloha 中,调度的应用被抽象为 Application 接口。只须要按需实现 Application 接口,就能够对多种不一样类型的应用进行调度管理。Application 的生命周期主要经过 start()
, shutdown()
进行管理,当应用被调度到 worker 上执行时, start()
方法首先被调用,当用户要求强制中止应用时,shutdown()
方法被调用。apache
trait Application {
//启动
def start(): Promise[ExitState]
//强制中止
def shutdown(reason: Option[String]): Unit
//提交应用时的描述
def withDescription(desc: ApplicationDescription): Application
//应用运行时的工做目录
def withApplicationDir(appDir: File): Application
//系统配置
def withAlohaConf(conf: AlohaConf): Application
//应用运行结束后的清理动做
def clean(): Unit
}
复制代码
你可能注意到了,start()
方法的返回值是一个 Promise
对象。这是由于,Aloha 最初在设计时主要针对的是长期运行的应用程序,如 Flink 任务、Spark Streaming 任务等。对于这一类 long-running 的应用,Future 和 Promise 提供了一种更灵活的任务状态通知机制。当任务中止后,经过调用 Promise.success()
方法告知 Worker。网络
例如,若是要经过启动一个独立进程的方式来启动一个应用程序,能够这样来实现:数据结构
override def start(): Promise[ExitState] = {
//启动进程
val processBuilder = getProcessBuilder()
process = processBuilder.start()
stateMonitorThread = new Thread("app-state-monitor-thread") {
override def run(): Unit = {
val exitCode = process.waitFor()
//进程退出
if(exitCode == 0) {
result.success(ExitState(ExitCode.SUCCESS, Some("success")))
} else {
result.success(ExitState(ExitCode.FAILED, Some("failed")))
}
}
}
stateMonitorThread.start()
result
}
override def shutdown(reason: Option[String]): Unit = {
if (process != null) {
//强制结束进程
val exitCode = Utils.terminateProcess(process, APP_TERMINATE_TIMEOUT_MS)
if (exitCode.isEmpty) {
logWarning("Failed to terminate process: " + process +
". This process will likely be orphaned.")
}
}
}
复制代码
在不少状况下,咱们但愿可以实时感知到任务状态的变化,例如在任务完成或者失败时发送一条消息提醒。Aloha 提供了事件监听接口,能够及时对任务状态的变化做出响应。架构
trait AlohaEventListener {
def onApplicationStateChange(event: AppStateChangedEvent): Unit
def onApplicationRelaunched(event: AppRelaunchedEvent): Unit
def onOtherEvent(event: AlohaEvent): Unit
}
复制代码
自定义实现的事件监听器在 Aloha 启动时动态注册,也能够同时注册多个监听器。app
Aloha 的总体实现方案是建构在 Spark 的基础之上,于是 Aloha 也是基于主从架构实现的,主要由 Master 和 Worker 这两个主要组件构成:Master 负责管理集群中全部的 Worker,接收用户提交的应用,并将应用分派给不一样的 Worker;而 Worker 主要是负责启动、关闭具体的应用,对应用的生命周期进行管理等。Aloha 还提供了 REST 服务,实际上充当了 Client 的角色,方便经过 REST 接口提交应用。 框架
在 Master 启动后,等待 Worker 的注册请求。在 Worker 启动时,根据 Master 的地址向 Master 发送注册请求。因为可能会有多个 Master 实例在运行,Worker 会全部的这些Master 都发送注册请求,只有处于 Alive 状态的 Master 会响应注册成功的消息,处于Standby 状态的 Master 会告知 Worker 本身正处于 Standby 状态,Worker 会忽略这一类消息。Worker 会一直尝试向 Master 发送注册请求,直到接收到注册成功的响应。在向 Master 发送注册请求时,请求的消息中会包含当前 Worker 节点的计算资源信息,包括可用的 CPU 数量和内存大小,Master 在进行调度的时候会追踪 Worker 的资源使用状况。异步
一旦 Worker 注册成功,就会周期性地向 Master 发送心跳信息。Master 则会按期检查全部 Worker 的心跳状况,一旦发现过久没有接收到某一个 Worker 的心跳消息,则认为该 Worker 已经下线。另外,网络故障或者进程异常退出等状况会形成 Master 和 Worker 之间创建的网络链接断开,链接断开的事件能直接被 Master 和 Worker 监听到。对 Master 而言,一旦一个 Worker 掉线,须要将该 Worker 上运行的应用置为为异常状态,或是从新调度这些应用。对于 Worker 而言,一旦失去和 Master 创建的链接,就须要从新进入注册流程。async
能够经过两种方式向 Master 提交 Application,一种方式是经过 REST 接口,另外一种方式是自行建立一个 Client,经过 Master 的地址向 Master 发送 RPC 调用。实际上 REST Server 充当了一个 Client 的角色。
当 Master 接收到注册 Application 的请求时,会分配 applicationId,并将应用放到等待调度的列表中。在调度时,采用 FIFO 的方式,选取剩余资源可以知足应用需求的 Worker,向对应的 Worker 发送启动应用的消息,应用从 SUMITTED
状态切换为 LAUNCHING
状态。Worker 在收到启动的应用的请求后,会为对应的应用建立工做目录,并为每个应用单独启动一个工做线程。应用成功启动后会向 Master 发送应用状态改变的消息,应用状态切换为 RUNNING
状态。此后每当应用状态发生改变,例如任务成功完成,或是异常退出,都会向 Master 发送应用状态改变的消息。在应用启动后,对于的工做线程会阻塞地等待应用结束。当 Master 接收到强制中止应用的请求后,会将消息转发给对应的 Worker,Worker 在接收到消息后会中断对应应用的工做线程,工做线程响应中断,调用 Application 提供的强制关闭方法强行中止应用。
为了支持扩展不一样的应用,Worker 在启动应用时使用了自定义的 ClassLoader 去加载应用提供的依赖包和配置文件路径。目前须要预先在每一个 Worker 上放置好对应的文件,并在提交应用时指定路径。后续能够考虑使用一个分布式文件系统,如 HDFS ,在启动应用前下载对应的依赖,或者用户提交应用时上传依赖文件,以免预先放置文件的不便。因为每一个应用的依赖文件都是单独进行加载的,用户能够方便地对应用进行升级,同时也避免了不一样 Application 出现依赖冲突的问题。
因为 Master 负责对整个集群的应用的调度状况进行管理,一旦 Master 出现异常,则整个集群就处于瘫痪的状态,于是必需要考虑为 Master 提供异常恢复机制。
Master 的异常恢复机制的核心流程在于状态的恢复。Master 会将已经注册的 Worker 和 Application的状态信息持久化存储在持久化引擎中(目前支持 FileSystem 和 ZooKeeper,支持扩展),每当 Worker 或者 Application 的状态发生更改,都会更新存储引擎中保存的状态。当 Master 启动时,处于 Standby
状态。一旦 Master 被选举为 Alive
节点,首先要从存储引擎中读取 Worker 和 Application 的状态信息,若是没有历史状态,则 Master 能够变动为 Alive
状态,不然进入恢复流程,状态变动为 RECOVERING
。在恢复流程中,首先要检查 Application 的状态,若是 Application 尚未被调度到任何 Worker 上,则 Application 被放入调度队列,不然将 Application 的状态置为 ApplicationState.UNKNOWN
。随后检查全部 Worker 的状态,将 Worker 置为 WorkerState.UNKNOWN
状态,并尝试向 Worker 发送 MasterChange
的消息。在 Worker 接收到 MasterChange
的消息后,会向 Master 响应目前该 Worker 上运行的全部 Application 的状态,Master 接收到响应后就能够将对应的 Worker 和 Application 分别调整为 WorkerState.ALIVE
和 ApplicationState.RUNNING
。对于超时仍没有获得响应的 Worker 和 Application,则认为已经掉线或异常退出。至此,状态恢复完成,Master 进入 ALIVE
状态,能够正常处理 Worker 和 Application 的各类请求。
在使用 Standalone 模式时,可使用 FILESYSTEM 做为存储引擎,这种状况下只有一个 Master 会运行,失败后须要手动进行重启,重启后状态能够恢复。也能够将 Master 配置为 HA 模式,多个 Master 实例同时运行,使用 ZooKeeper 做为 LeaderElectionAgent 和存储引擎,当 Alive 状态的 Master 失败后会自动选举出新的主节点,并自动进行状态恢复。
Master 在启动时会建立一个事件总线,并注册多个事件监听器,事件监听器能够方便地进行扩展,从而知足不一样的需求。事件总线的核心是一个异步的事件分发机制,基于阻塞队列实现。当接收到新事件时,会将事件分派给事件监听器处理。每当 Master 接收到 Application 状态发生变动的消息时,就会将对应的事件放入事件总线,于是监听器能够及时获取到任务状态的变动事件。
从上一节的介绍能够看出,做为一个分布式的系统,Master 和 Worker 之间存在大量的通讯,这些不一样的组件之间的通讯正是经过 RPC 来实现的。
在 Aloha 中,RPC 模块不一样于传统的 RPC 框架,不须要预先使用 IDL (Interface Description Language) 来定义客户端和服务端进行通讯的数据结构、服务端提供的服务等,而是直接基于 Scala 的模式匹配来完成消息的识别和路由。之因此这样来实现,是由于在这里 RPC 的主要定位是做为内部组件之间通讯的桥梁,无需考虑跨语言等特性。基于 Scala 的模式匹配进行路由下降了代码的复杂度,使用起来很是便捷。
咱们先看一个简单的例子,来了解一下 RPC 的基本使用方法。其核心就在于 RpcEndpoint 的实现。
//------------------------ Server side ----------------------------
object HelloWorldServer {
def main(args: Array[String]): Unit = {
val host = "localhost"
val rpcEnv: RpcEnv = RpcEnv.create("hello-server", host, 52345, new AlohaConf())
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()
}
}
class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("Service started.")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) =>
context.reply(s"Aloha: $msg")
case SayBye(msg) =>
context.reply(s"Bye :), $msg")
}
override def onStop(): Unit = {
println("Stop hello endpoint")
}
}
case class SayHi(msg: String)
case class SayBye(msg: String)
//--------------------------- Client side -------------------------------
object HelloWorldClient {
def main(args: Array[String]): Unit = {
val host = "localhost"
val rpcEnv: RpcEnv = RpcEnv.create("hello-client", host, 52345, new AlohaConf, true)
val endPointRef: RpcEndpointRef = rpcEnv.retrieveEndpointRef(RpcAddress("localhost", 52345), "hello-service")
val future: Future[String] = endPointRef.ask[String](SayHi("WALL-E"))
future.onComplete {
case Success(value) => println(s"Got response: $value")
case Failure(e) => println(s"Got error: $e")
}
Await.result(future, Duration.apply("30s"))
}
}
复制代码
从上面的例子很容易观察到,RpcEndpoint
、 RpcEndpointRef
和 RpcEnv
是使用这个 RPC 框架的关键。若是你刚好知道一点 Actor 模型和 Akka 的基本概念,很容易就能把这三个抽象同 Akka 中的 Actor
, ActorRef
和 ActorSystem
联系起来。事实上,Spark 内部的 RPC 最初正是基于 Akka 来实现的,后来虽然剥离了 Akka,但基本的设计理念却保留了下来。
简单地来讲,RpcEndpoint
是一个可以接收消息并做出响应的服务。Master 和 Worker 实际上都是 RpcEndpoint
。
RpcEndpoint
对接收的消息有两种方式,分别对应须要做出应答和不须要做出应答,即:
def receive: PartialFunction[Any, Unit] = {
case _ => throw new AlohaException(self + " does not implement 'receive'")
}
def receiveAndReply(context: RpcCallContext): PartialFunction[Any,Unit] = {
case _ => context.sendFailure(new AlohaException(self + " won't reply anything"))
}
复制代码
其中,RpcCallContext
用于向消息发送方做出应答,包括回复正常的响应以及错误的异常。经过 RpcCallContext
将业务逻辑和数据传输进行了解耦,服务方无需知道请求的发送方是来自本地仍是来自远端。
RpcEndpoint
还包含了一系列生命周期相关的回调方法,如 onStart
, onStop
, onError
, onConnected
, onDisconnected
, onNetworkError
。
RpcEndpointRef
是对 RpcEndpoint
的引用,它是服务调用方发送请求的入口。经过获取 RpcEndpoint
对应的 RpcEndpointRef
,就能够直接向 RpcEndpoint
发送请求。不管 RpcEndpoint
是在本地仍是在远端,向 RpcEndpoint
发送消息的方法都是一致的。这也正是 RPC 存在的意义,即:执行一个远程服务提供的方法,就如同调用本地方法同样。
RpcEndpointRef
提供了以下几种请求的发送方式:
//Sends a one-way asynchronous message. Fire-and-forget semantics.
def send(message: Any): Unit
// Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any):T = askSync(message, defaultAskTimeout)
//Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a specified timeout, throw an exception if this fails.
def askSync[T: ClassTag](message: Any,timeout: RpcTimeout):T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
复制代码
RpcEnv
是 RpcEndpoint
的运行时环境。一方面,它负责 RpcEndpoint
的注册,RpcEndpoint
生命周期的管理,以及根据 RpcEndpoint
的地址来获取对应 RpcEndpointRef
;另外一方面,它还负责请求的进一步封装,底层数据的网络传输,消息的路由等。
RpcEnv
有两种模式,一种是 Server 模式,一种是 Client 模式。在 Server 模式下,能够向RpcEnv
注册 RpcEndpoint
,而且会注册一个特殊的 Endpoint,即 RpcEndpointVerifier
,在获取 RpcEndpointRef
时,会经过 RpcEndpointVerifier
验证对应的 RpcEndpoint
是否存在。
RpcEnv
经过工厂模式来建立,底层具体的实现方案是可替换的,目前使用的是基于 Netty 实现的 NettyRpcEnv
。
在 NettyRpcEnv
内部,为了高效进行消息的路由与传递,使用了一种相似于 mailbox 的设计。
对于每个 RpcEndpoint
,都有一个关联的 Inbox
,Inbox
内部有一个消息列表,这个消息列表中保存了这个 RpcEndpoint
收到的全部消息,包括须要应答的 RpcMessage
,无需应答的 OneWayMessage
, 以及各类和生命周期相关的状态消息,对于每一条消息,都会调用对应在 RpcEndpoint
内部定义的各类函数进行处理。而 Dispatcher
则充当了消息投递的角色。对于 NettyRpcEnv
接收到的全部消息, Dispatcher
都会根据指定的 Endpoint 标识找到对应的 Inbox
,并将消息投递进去。此外,Dispatcher
内部启动了一个 MessageLoop,这个 MessaLoop 不断从阻塞队列中获取有新消息到达的 Endpoint,不断地消化新到达的这些消息。
和 Inbox
遥相呼应的是,在 NettyRpcEnv
内部维护了 RpcAddress
和 Outbox
的映射关系,每一个远程 Endpoint 都对应一个 Outbox 。在经过 RpcEndpointRef
发送消息时, NettyRpcEnv
会根据 RpcEndpoint
的地址进行判断:若是是本地的 Endpoint, 则直接经过 Dispatcher
进行消息投递;若是是远端的 Endpoint, 则将消息投递到对应的 Outbox
中。 Outbox
中也有一个待投递的消息列表,在首次向远端 Endpoint 投递消息时,会先创建网络链接,而后依次将消息发送出去。
在 NettyRpcEnv
中,如何将请求发送给远端的 Endpoint,并收到远端 Endpoint 给出的回复,这就要要依赖于更底层的网络传输模块。网络传输模块,主要是对 Netty 的更进一步封装,其中关键的组件及功能以下:
TransportServer
: 网络传输的服务端,当 NettyRpcEnv
以 Server 模式启动时就会建立一个 TransportServer
,等待客户端的链接请求TransportClient
:网络传输的客户端,实际上就是对 channel 的进一步封装,一旦网络双方的请求创建成功,那么在 channel 的两端就各有一个 TransportClient
,从而能够以全双工的方式进行数据交换TransportClientFactory
:建立 TransportClient
的工厂类,内部使用了链接池,能够复用已经创建的链接RpcHandler
:负责对接收到的 RPC 请求消息进行处理,NettyRpcEnv
就是在这个接口的方法中将消息交给 Dispatcher
进行投递RpcResponseCallback
:RPC 请求响应的回调接口,NettyRpcEnv
基于这个接口对接收到的数据进行反序列化TransportRequestHandler
:对请求消息进行处理,主要是将消息转交给 RpcHandler
进行处理TransportResponseHandler
:对响应消息进行处理,记录了每一条已发送的消息和与其关联的 RpcResponseCallback
,一旦收到响应,就调用对应的回调方法TransportChannelHandler
:位于 channel pipeline 的尾端,根据消息类型将消息交给 TransportRequestHandler
或 TransportResponseHandler
进行处理TransportContext
:用于建立 TransportServer
和 TransportClientFactory
,并初始化 Netty Channel 的 pipeline其余的诸如引导服务端、引导客户端、消息的编解码等过程,都是使用 Netty 进行网络通讯的惯常流程,这里再也不详述。
Aloha 是一个分布式调度框架 Aloha ,它的实现主要参考了 Spark。文中首先介绍了 Aloha 的使用场景和扩展方式,并采用自顶向下的方式重点介绍了 Aloha 的模块设计和实现方案。
Aloha 现已在 Github 开源,项目地址: github.com/jrthe42/alo… 。有关该项目的任何问题,欢迎各位经过 issue 进行交流。
-EOF-
原文地址: blog.jrwang.me/2019/aloha-… 转载请注明出处!