【Flink】flink 内部 Akka and Actors

使用Akka,全部远程过程调用如今都实现为异步消息。 这主要影响JobManager,TaskManager和JobClient的组件。 未来,甚至可能会将更多的组件转换为参与者,从而容许它们发送和处理异步消息。安全

Akka and the Actor Model

Akka是开发并发,容错和可扩展应用程序的框架。 它是参与者模型的实现,所以相似于Erlang的并发模型。 在参与者模型的上下文中,全部代理实体都被视为独立的参与者。 角色经过彼此发送异步消息与其余角色进行通讯。 参与者模型的优点在于这种异步性。 也能够显式等待响应,以便您执行同步操做。 可是,强烈建议不要使用同步消息,由于它们会限制系统的可伸缩性。 每一个参与者都有一个邮箱,其中存储了收到的消息。 此外,每一个参与者都保持本身的孤立状态。 下面是几个参与者的示例网络。网络

角色具备单个处理线程,该线程轮询角色的邮箱并连续处理接收到的消息。 做为已处理消息的结果,参与者能够更改其内部状态,发送新消息或生成新参与者。 若是一个actor的内部状态是从其处理线程内部专门操纵的,则无需使actor的状态线程安全。 即便单个参与者本质上是顺序的,由多个参与者组成的系统也是高度并发且可扩展的,由于处理线程在全部参与者之间共享。 这种共享也是为何从不该该从参与者线程内部调用阻塞调用的缘由。 这样的调用将阻止该线程被其余参与者用来处理本身的消息。并发

Actor Systems

Actor系统是全部演员生活的容器。 它提供共享服务,例如计划,配置和日志记录。 参与者系统还包含线程池,全部参与者线程都从该线程池中募集。框架

多角色系统能够共存于一台机器上。 若是actor系统以RemoteActorRefProvider启动,则能够从可能驻留在远程计算机上的另外一个actor系统进行访问。 演员系统自动识别演员消息是发给同一个演员系统中仍是远程演员系统中的演员的。 在本地通讯的状况下,可使用共享内存有效地传输消息。 在远程通讯的状况下,消息是经过网络堆栈发送的。异步

全部参与者都按层次结构组织。 每一个新建立的actor都会将其建立actor做为父级分配。 该层次结构用于监督。 每一个父母都有对其子女的监护权。 若是其中一个子项发生错误,则会通知他。 若是演员能够解决问题,那么他能够继续或从新开始孩子。 若是问题超出了他的处理范围,他能够将错误上报给本身的父母。 逐步升级错误仅表示当前层之上的层次结构层如今负责解决问题。分布式

系统建立的第一个参与者由系统提供的守护者参与者/用户监督。 角色层次在此进行了详细说明。ide

Actors in Flink

Actors自己就是状态和行为的容器。 它是actor线程顺序处理传入的消息。 由于一个actor一次仅活动一个线程,因此它使用户摆脱了易于出错的锁定和线程管理任务。 可是,必须确保仅今后参与者线程访问参与者的内部状态。 actor的行为由接收函数定义,该函数为每一个消息包含在接收到此消息时执行的某些逻辑。函数

Flink系统由三个必须通讯的分布式组件组成:JobClient,JobManager和TaskManager。 JobClient从用户那里获取Flink做业,并将其提交给JobManager。 而后JobManager负责编排做业执行。 首先,它分配所需的资源量。 这主要包括TaskManager上的执行插槽。spa

分配资源后,JobManager将做业的各个任务部署到相应的TaskManager中。一旦收到任务,TaskManager会生成一个执行任务的线程。 状态更改(例如开始计算或完成计算)将发送回JobManager。 根据这些状态更新,JobManager将引导做业执行直到完成。 做业完成后,其结果将发送回给JobClient,以告知用户相关信息。 下图描述了做业执行过程。线程

JobManager & TaskManager

JobManager是中央控制单元,负责执行Flink做业。 所以,它控制着资源分配,任务调度和状态报告。

必须先启动一个JobManager和一个或多个TaskManager,而后才能执行任何Flink做业。 而后TaskManager经过向JobManager发送RegisterTaskManager消息在JobManager上注册。 JobManager经过``确认注册''消息确认注册成功。 若是TaskManager已在JobManager上注册,则因为发送了多个RegisterTaskManager消息,则JobManager返回一个AlreadyRegistered消息。 若是注册被拒绝,则JobManager将以RefuseRegistration消息做为响应。

经过向做业管理器发送带有相应JobGraph的SubmitJob消息向做业管理器提交做业。 收到JobGraph后,JobManager将在JobGraph中建立一个ExecutionGraph,做为分布式执行的逻辑表示。 ExecutionGraph包含有关必须执行才能部署到TaskManager的任务的信息。

JobManager的调度程序负责在可用TaskManager上分配执行插槽。 在TaskManager上分配执行插槽后,带有执行任务所需的全部必要信息的SubmitTask消息将发送到相应的TaskManager。 TaskOperationResult确认任务部署成功。 一旦部署并运行了提交做业的源,做业提交也被认为是成功的。 JobManager经过发送带有相应做业ID的成功消息来通知JobClient此状态。

在TaskManager上运行的单个任务的状态更新经过UpdateTaskExecutionState消息发送回JobManager。 使用这些更新消息,能够更新ExecutionGraph以反映执行的当前状态。

JobManager还充当数据源的输入拆分分配器。 它负责在全部TaskManager之间分配工做,以便在可能的状况下保留数据局部性。 为了动态平衡负载,任务在完成对旧输入的处理后,会请求新的输入拆分。 该请求是经过将RequestNextInputSplit发送到JobManager来实现的。 JobManager用NextInputSplit消息响应。 若是没有更多输入拆分,则消息中包含的输入拆分为null。

任务被延迟部署到任务管理器。 这意味着消耗数据的任务仅在其生产者之一完成生产某些数据以后才部署。 生产者执行此操做后,就会将ScheduleOrUpdateConsumers消息发送到JobManager。 此消息代表,消费者如今能够读取新生成的数据。 若是使用任务还没有运行,它将被部署到TaskManager。

JobClient

JobClient表明分布式系统的面向用户的组件。 它用于与JobManager进行通讯,所以它负责提交Flink做业,查询已提交做业的状态并接收当前正在运行的做业的状态消息。

JobClient仍是您经过消息与之通讯的参与者。 存在与做业提交有关的两条消息:SubmitJobDetached和SubmitJobWait。 第一条消息提交做业,并从接收任何状态消息和最终做业结果中注销。 若是您想以丢脸的方式将做业提交到Flink群集,则分离模式很是有用。

SubmitJobWait消息将做业提交到JobManager并注册以接收该做业的状态消息。 在内部,这是经过生成辅助角色来完成的,该辅助角色用做状态消息的接收者。 做业终止后,由JobManager将带有持续时间和累加器结果的JobResultSuccess发送给产生的助手角色。 收到此消息后,辅助角色将消息转发给客户端,该客户端最初发出了SubmitJobWait消息,而后终止。

Asynchronous vs. Synchronous Messages

Flink尽量尝试使用异步消息并将响应做为未来处理。 期货和少数现有的阻塞调用都有一个超时,在此以后该操做将被视为失败。 这样能够防止消息丢失或分布式组件崩溃时系统陷入僵局。 可是,若是您碰巧拥有很是大的群集或缓慢的网络,则可能会错误地触发超时。 所以,能够经过配置中的“ akka.ask.timeout”指定这些操做的超时时间。

演员能够与其余演员交谈以前,必须为其检索ActorRef。 此操做的查找也须要超时。 为了使Actor未启动时系统快速故障,将查找超时设置为比常规超时更小的值。 若是遇到查找超时的状况,能够经过配置中的“ akka.lookup.timeout”来增长查找时间。

Akka的另外一个特色是它设置了能够发送的最大邮件大小的限制。 缘由是它保留了相同大小的序列化缓冲区,而且不想浪费内存。 若是因为消息超出最大大小而遇到传输错误,则能够经过配置中的“ akka.framesize”来增长帧大小。

Failure Detection

分布式系统中的故障检测对其鲁棒性相当重要。 在商品集群上运行时,老是会发生某些组件发生故障或没法再访问的状况。 此类故障的缘由是多态的,从硬件故障到网络中断均可能形成故障。 一个强大的分布式系统应该可以检测出故障的组件并从中恢复。

Flink经过使用Akka的DeathWatch机制来检测故障组件。 即便没有受到该演员的监督,甚至不在另外一个演员系统中,DeathWatch也可让演员观看其余演员。 一旦被观看的演员死亡或没法联系,终止消息就会发送给观看的演员。 所以,在接收到这样的消息时,系统能够针对它采起步骤。 在内部,DeathWatch被实现为心跳和故障检测器,它基于心跳间隔,听音暂停和故障阈值来估计演员什么时候可能死亡。 能够经过在配置中设置“ akka.watch.heartbeat.interval”值来控制心跳间隔。 能够经过“ akka.watch.heartbeat.pause”指定可接受的心跳暂停。 心跳暂停应为心跳间隔的倍数,不然丢失的心跳将直接触发DeathWatch。 能够经过“ akka.watch.threshold”指定故障阈值,它能够有效地控制故障检测器的灵敏度。 您能够在此处找到有关DeathWatch机制和故障检测器的更多详细信息。

在Flink中,JobManager监视全部已注册的TaskManager,而TaskManager监视JobManager。 这样,两个组件都知道什么时候再也不可访问另外一个组件。 JobManager的反应是将各个TaskManager标记为已死,以防止未来的任务部署到该TaskManager。 此外,它将使当前正在此任务管理器上运行的全部任务失败,并在其余TaskManager上从新安排其执行时间。 若是TaskManager仅因暂时的链接丢失而被标记为死,那么一旦从新创建链接,它就能够在JobManager中简单地从新注册本身。

TaskManager还监视JobManager。 此监视容许TaskManager在检测到JobManager失败时经过使全部当前正在运行的任务失败来进入清除状态。 此外,若是触发的死亡仅由网络拥塞或链接丢失引发,TaskManager将尝试从新链接到JobManager。

原文地址

相关文章
相关标签/搜索