本文从一个调试时候常见的异常 "TimeoutException: Heartbeat of TaskManager timed out"切入,为你们剖析Flink的心跳机制。文中代码基于Flink 1.10。html
你们若是常常调试Flink,当进入断点看到了堆栈和变量内容以后,你容易陷入了沉思。当你发现了问题可能所在,高兴的让程序Resume的时候,你发现程序没法运行,有以下提示:java
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.
这实在是很郁闷的事情。做为程序猿不能忍啊,既然异常提示中有 Heartbeat 字样,因而咱们就来一块儿看看Flink的心跳机制,看看有没有能够修改的途径。apache
Flink有核心四大组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。缓存
这四大组件彼此之间的通讯须要依赖RPC实现。服务器
Flink底层RPC基于Akka实现。Akka是一个开发并发、容错和可伸缩应用的框架。它是Actor Model的一个实现,和Erlang的并发模型很像。在Actor模型中,全部的实体被认为是独立的actors。actors和其余actors经过发送异步消息通讯。数据结构
Actor模型的强大来自于异步。它也能够显式等待响应,这使得能够执行同步操做。可是强烈不建议同步消息,由于它们限制了系统的伸缩性。架构
RPC做用是:让异步调用看起来像同步调用。并发
Flink基于Akka构建了其底层通讯系统,引入了RPC调用,各节点经过GateWay方式回调,隐藏通讯组件的细节,实现解耦。Flink整个通讯框架的组件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等构成。app
RPC相关的主要接口以下:框架
RpcEndpoint是Flink RPC终端的基类,全部提供远程过程调用的分布式组件必须扩展RpcEndpoint,其功能由RpcService支持。
RpcEndpoint的子类只有四类组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有这四个组件有RPC的能力,换句话说只有这四个组件有RPC的这个需求。
每一个RpcEndpoint对应了一个路径(endpointId和actorSystem共同肯定),每一个路径对应一个Actor,其实现了RpcGateway接口,
RpcServer是RpcEndpoint的成员变量,为RpcService提供RPC服务/链接远程Server,其只有一个子类实现:AkkaRpcService(可见目前Flink的通讯方式依然是Akka)。
RpcServer用于启动和链接到RpcEndpoint, 链接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。
Flink四大组件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的实现,因此构建四大组件时,同步须要初始化RpcServer。如JobManager的构造方式,第一个参数就是须要知道RpcService。
Flink的RPC协议经过RpcGateway来定义;由前面可知,若想与远端Actor通讯,则必须提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster会先启动ActorSystem,此时TaskExecutor的Container还未分配,后面与TaskExecutor通讯时,必须让其提供对应地址。
Dispatcher,JobMaster,ResourceManager,TaskExecutor 这四大组件经过各类方式实现了Gateway。以JobMaster为例,JobMaster实现JobMasterGateway接口。各组件类的成员变量都有须要通讯的其余组件的GateWay实现类,这样可经过各自的Gateway实现RPC调用。
常见的心跳检测有两种:
Flink实现的是第二种方案。
Flink的心跳机制代码在:
Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat
四个接口:
HeartbeatListener.java HeartbeatManager.java HeartbeatTarget.java HeartbeatMonitor.java
以及以下几个类:
HeartbeatManagerImpl.java HeartbeatManagerSenderImpl.java HeartbeatMonitorImpl.java HeartbeatServices.java NoOpHeartbeatManager.java
Flink集群有多种业务流程,好比Resource Manager, Task Manager, Job Manager。每种业务流程都有本身的心跳机制。Flink的心跳机制只是提供接口和基本功能,具体业务功能由各业务流程本身实现。
咱们首先设定 心跳系统中有两种节点:sender和receiver。心跳机制是sender和receivers彼此相互检测。可是检测动做是Sender主动发起,即Sender主动发送请求探测receiver是否存活,由于Sender已经发送过来了探测心跳请求,因此这样receiver同时也知道Sender是存活的,而后Reciver给Sender回应一个心跳表示本身也是活着的。
由于Flink的几个名词和咱们常见概念有所差异,因此流程上须要你们仔细甄别,即:
HeartbeatTarget是对监控目标的抽象。心跳机制在行为上而言有两种动做:
HeartbeatTarget的函数就是这两个动做:
这两个函数的参数也很简单:分别是请求的发送放和接收方,还有Payload载荷。对于一个肯定节点而言,接收的和发送的载荷是同一类型的。
public interface HeartbeatTarget<I> { /** * Sends a heartbeat response to the target. * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported. */ // heartbeatOrigin 就是 Receiver void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload); /** * Requests a heartbeat from the target. * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request. */ // requestOrigin 就是 Sender void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload); }
对HeartbeatTarget的封装,这样Manager对Target的操做是经过对Monitor完成,后续会在其继承类中详细说明。
public interface HeartbeatMonitor<O> { // Gets heartbeat target. HeartbeatTarget<O> getHeartbeatTarget(); // Gets heartbeat target id. ResourceID getHeartbeatTargetId(); // Report heartbeat from the monitored target. void reportHeartbeat(); //Cancel this monitor. void cancel(); //Gets the last heartbeat. long getLastHeartbeat(); }
HeartbeatManager负责管理心跳机制,好比启动/中止/报告一个HeartbeatTarget。此接口继承HeartbeatTarget。
除了HeartbeatTarget的函数以外,这接口有4个函数:
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> { void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget); void unmonitorTarget(ResourceID resourceID); void stop(); long getLastHeartbeatFrom(ResourceID resourceId); }
用户业务逻辑须要继承这个接口以处理心跳结果。其能够看作服务的输出,实现了三个回调函数。
public interface HeartbeatListener<I, O> { void notifyHeartbeatTimeout(ResourceID resourceID); void reportPayload(ResourceID resourceID, I payload); O retrievePayload(ResourceID resourceID); }
以前提到Sender和Receiver,下面两个类就对应上述概念。
几个关键问题:
如何断定心跳超时?
心跳服务启动后,Flink在Monitor中经过 ScheduledFuture 会启动一个线程来处理心跳超时事件。在设定的心跳超时时间到达后才执行线程。
若是在设定的心跳超时时间内接收到组件的心跳消息,会先将该线程取消然后从新开启,重置心跳超时事件的触发。
若是在设定的心跳超时时间内没有收到组件的心跳,则会通知组件:你超时了。
什么时候"调用双方"发起心跳检查?
心跳检查是双向的,一方(Sender)会主动发起心跳请求,而另外一方(Receiver)则是对心跳作出响应,二者经过RPC相互调用,重置对方的 Monitor 超时线程。
以JobMaster和TaskManager为例,JM在启动时会开启周期调度,向已经注册到JM中的TM发起心跳检查,经过RPC调用TM的requestHeartbeat方法,重置TM中对JM超时线程的调用,表示当前JM状态正常。在TM的requestHeartbeat方法被调用后,经过RPC调用JM的receiveHeartbeat,重置 JM 中对TM超时线程的调用,表示TM状态正常。
如何处理心跳超时?
心跳服务依赖 HeartbeatListener,当在timeout时间范围内未接收到心跳响应,则会触发超时处理线程,该线程经过调用HeartbeatListener.notifyHeartbeatTimeout
方法作后续重连操做或者直接断开。
下面是一个概要(以RM & TM为例):
RM : 实现了ResourceManagerGateway (能够直接被RPC调用)
TM : 实现了TaskExecutorGateway (能够直接被RPC调用)
RM :有一个Sender HM : taskManagerHeartbeatManager,Sender HM 拥有用户定义的 TaskManagerHeartbeatListener
TM :有一个Receiver HM :resourceManagerHeartbeatManager,Receiver HM 拥有用户定义的ResourceManagerHeartbeatListener。
HeartbeatManager 有一个ConcurrentHashMap<ResourceID, HeartbeatMonitor
对于RM的每个须要监控的TM, 其生成一个HeartbeatTarget,进而被构形成一个HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。
每个Target对应的Monitor中,有本身的异步任务ScheduledFuture,这个ScheduledFuture不停的被取消/从新生成。若是在某个期间内没有被取消,则通知用户定义的listener出现了timeout。
HearbeatManagerImpl是receiver的具体实现。它由 心跳 被发起方(就是Receiver,例如TM) 建立,接收 发起方(就是Sender,例如 JM)的心跳发送请求。心跳超时 会触发 heartbeatListener.notifyHeartbeatTimeout方法。
注意:被发起方监控线程(Monitor)的开启是在接收到请求心跳(requestHeartbeat被调用后)之后才触发的,属于被动触发。
HearbeatManagerImpl主要维护了
一个心跳监控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
。这是一个KV关联。
key表明要发送心跳组件(例如:TM)的ID,value则是为当前组件建立的触发心跳超时的线程HeartbeatMonitor,二者一一对应。
当一个从所联系的machine发过来的心跳被收到时候,对应的monitor的状态会被更新(重启一个新ScheduledFuture)。当一个monitor发现了一个 heartbeat timed out,它会通知本身的HeartbeatListener。
一个 ScheduledExecutor mainThreadExecutor 负责heartbeat timeout notifications。
heartbeatListener :处理心跳结果。
HearbeatManagerImpl 数据结构以下:
@ThreadSafe public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { /** Heartbeat timeout interval in milli seconds. */ private final long heartbeatTimeoutIntervalMs; /** Resource ID which is used to mark one own's heartbeat signals. */ private final ResourceID ownResourceID; /** Heartbeat listener with which the heartbeat manager has been associated. */ private final HeartbeatListener<I, O> heartbeatListener; /** Executor service used to run heartbeat timeout notifications. */ private final ScheduledExecutor mainThreadExecutor; /** Map containing the heartbeat monitors associated with the respective resource ID. */ private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets; /** Running state of the heartbeat manager. */ protected volatile boolean stopped; }
HearbeatManagerImpl实现的主要函数有:
继承HearbeatManagerImpl,由心跳管理的一方(例如JM)建立,实现了run函数(即它能够做为一个单独线程运行),建立后当即开启周期调度线程,每次遍历本身管理的heartbeatTarget,触发heartbeatTarget.requestHeartbeat,要求 Target 返回一个心跳响应。属于主动触发心跳请求。
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable { public void run() { if (!stopped) { for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { requestHeartbeat(heartbeatMonitor); } // 周期调度 getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); } } // 主动发起心跳检查 private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // 调用 Target 的 requestHeartbeat 函数 heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); } }
Heartbeat monitor管理心跳目标,它启动一个ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable { /** Resource ID of the monitored heartbeat target. */ private final ResourceID resourceID; // 被监控的resource ID /** Associated heartbeat target. */ private final HeartbeatTarget<O> heartbeatTarget; //心跳目标 private final ScheduledExecutor scheduledExecutor; /** Listener which is notified about heartbeat timeouts. */ private final HeartbeatListener<?, ?> heartbeatListener; // 心跳监听器 /** Maximum heartbeat timeout interval. */ private final long heartbeatTimeoutIntervalMs; private volatile ScheduledFuture<?> futureTimeout; // AtomicReference 使用 private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING); // 最近一次接收到心跳的时间 private volatile long lastHeartbeat; // 报告心跳 public void reportHeartbeat() { // 保留最近一次接收心跳时间 lastHeartbeat = System.currentTimeMillis(); // 接收心跳后,重置timeout线程 resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); } // 心跳超时,触发lister的notifyHeartbeatTimeout public void run() { // The heartbeat has timed out if we're in state running if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { heartbeatListener.notifyHeartbeatTimeout(resourceID); } } // 重置TIMEOUT void resetHeartbeatTimeout(long heartbeatTimeout) { if (state.get() == State.RUNNING) { //先取消线程,在从新开启 cancelTimeout(); // 启动超时线程 futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); // Double check for concurrent accesses (e.g. a firing of the scheduled future) if (state.get() != State.RUNNING) { cancelTimeout(); } } }
创建heartbeat receivers and heartbeat senders,主要是对外提供服务。这里咱们能够看到:
public class HeartbeatServices { // Creates a heartbeat manager which does not actively send heartbeats. public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) { return new HeartbeatManagerImpl<>(...); } // Creates a heartbeat manager which actively sends heartbeats to monitoring targets. public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) { return new HeartbeatManagerSenderImpl<>(...); } }
心跳管理服务在Cluster入口建立。由于咱们是调试,因此在MiniCluster.start调用。
public void start() throws Exception { ...... heartbeatServices = HeartbeatServices.fromConfiguration(configuration); ...... }
HeartbeatServices.fromConfiguration会从Configuration中获取配置信息:
这个就是咱们解决最开始问题的思路:从配置信息入手,扩大心跳间隔。
public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { this.heartbeatInterval = heartbeatInterval; this.heartbeatTimeout = heartbeatTimeout; } public static HeartbeatServices fromConfiguration(Configuration configuration) { long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); }
系统中有几个ResourceManager?整个 Flink 集群中只有一个 ResourceManager。
系统中有几个JobManager?JobManager 负责管理做业的执行。默认状况下,每一个 Flink 集群只有一个 JobManager 实例。JobManager 至关于整个集群的 Master 节点,负责整个集群的任务管理和资源管理。
系统中有几个TaskManager?这个由具体启动方式决定。好比Flink on Yarn,Session模式可以指定拉起多少个TaskManager。 Per job模式中TaskManager数量是在提交做业时根据并发度动态计算,即Number of TM = Parallelism/numberOfTaskSlots。好比:有一个做业,Parallelism为10,numberOfTaskSlots为1,则TaskManager为10。
Flink中ResourceManager、JobMaster、TaskExecutor三者之间存在相互检测的心跳机制:
咱们以前讲过,HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。
ResourceManager 级别最高,因此两个HM都是Sender,监控taskManager和jobManager
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway, LeaderContender { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender }
JobMaster级别中等,一个Sender, 一个Receiver,受到ResourceManager的监控,监控taskManager。
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager }
TaskExecutor级别最低,两个Receiver,分别被JM和RM疾控。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager }
以JobManager和TaskManager为例。JM在启动时会开启周期调度,向已经注册到JM中的TM发起心跳检查,经过RPC调用TM的requestHeartbeat方法,重置对JM超时线程的调用,表示当前JM状态正常。在TM的requestHeartbeat方法被调用后,经过RPC调用JM的receiveHeartbeat,重置对TM超时线程的调用,表示TM状态正常。
TM初始化生成了两个Receiver HM。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The heartbeat manager for job manager in the task manager. */ private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager; /** The heartbeat manager for resource manager in the task manager. */ private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager; //初始化函数 this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId); this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId); }
生成HeartbeatManager时,就注册了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。
此时,两个HeartbeatManagerImpl中已经建立好对应monitor线程,只有在JM或者RM执行requestHeartbeat后,才会触发该线程的执行。
JM生成了一个Sender HM,一个Receiver HM。这里会注册 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager; private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; private void startHeartbeatServices() { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), log); } }
JobMaster在启动时候,会在startHeartbeatServices函数中生成两个Sender HeartbeatManager。
taskManagerHeartbeatManager :HeartbeatManagerSenderImpl对象,会反复启动一个定时器,定时扫描须要探测的对象而且发送心跳请求。
jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,会反复启动一个定时器,定时扫描须要探测的对象而且发送心跳请求。
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
咱们以TM与RM交互为例。TaskExecutor启动以后,须要注册到RM和JM中。
流程图以下:
* 1. Run in Task Manager * * TaskExecutor.onStart //Life cycle * | * +----> startTaskExecutorServices@TaskExecutor * | //开始TM服务 * | * +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); * | // 开始链接到RM * | // start by connecting to the ResourceManager * | * +----> notifyLeaderAddress@ResourceManagerLeaderListener * | // 当RM状态变化以后,将回调到这里 * | // The listener for leader changes of the resource manager. * | * +----> reconnectToResourceManager@TaskExecutor * | // 如下三步调用是渐进的,就是与RM联系。 * | * +----> tryConnectToResourceManager@TaskExecutor * | * +----> connectToResourceManager()@TaskExecutor * | // 主要做用是生成了 TaskExecutorToResourceManagerConnection * | * +----> start@TaskExecutorToResourceManagerConnection * | // 开始RPC调用,将会调用到其基类RegisteredRpcConnection的start * | * +----> start@RegisteredRpcConnection * | // RegisteredRpcConnection实现了组件之间注册联系的基本RPC * | * ~~~~~~~~ 这里是 Akka RPC * 2. Run in Resource Manager * 如今程序执行序列到达了RM, 主要是添加一个Target到RM 的 Sender HM; * * registerTaskExecutor@ResourceManager * | * +----> taskExecutorGatewayFuture.handleAsync * | // 异步调用到这里 * | * +----> registerTaskExecutorInternal@ResourceManager * | // RM的内部实现,将把TM注册到RM本身这里 * | * +----> taskManagerHeartbeatManager.monitorTarget * | // 生成HeartbeatMonitor, * | * +----> heartbeatTargets.put(resourceID,heartbeatMonitor); * | // 把Monitor放到 HM in TM之中,就是说TM开始监控了RM * | * ~~~~~~~~ 这里是 Akka RPC * 3. Run in Task Manager * 如今程序回到了TM, 主要是添加一个Target到 TM 的 Receiver HM; * * onRegistrationSuccess@TaskExecutorToResourceManagerConnection * | * | * +----> onRegistrationSuccess@ResourceManagerRegistrationListener * | // 回调函数 * | * +----> runAsync(establishResourceManagerConnection) * | // 异步执行 * | * +----> establishResourceManagerConnection@TaskExecutor * | // 说明已经和RM创建了联系,因此能够开始监控RM了 * | * +----> resourceManagerHeartbeatManager.monitorTarget * | // 生成HeartbeatMonitor, * | * +----> heartbeatTargets.put(resourceID,heartbeatMonitor); * | // 把 RM 也注册到 TM了 * | // monitor the resource manager as heartbeat target
下面是具体文字描述。
private final LeaderRetrievalService resourceManagerLeaderRetriever; // resourceManagerLeaderRetriever实际上是EmbeddedLeaderService的实现,A simple leader election service, which selects a leader among contenders and notifies listeners. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the ResourceManager will always send heartbeat requests to the // TaskManager } public void requestHeartbeat(ResourceID resourceID, Void payload) { taskExecutorGateway.heartbeatFromResourceManager(resourceID); } });
当注册完成后,RM中的Sender HM内部结构以下,能看出来多了一个Target:
taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} heartbeatPeriod = 10000 heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af" heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatTargets = {ConcurrentHashMap@8875} size = 1 {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" value = {HeartbeatMonitorImpl@9448} resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" heartbeatTarget = {ResourceManager$2@8868} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} heartbeatTimeoutIntervalMs = 50000 futureTimeout = {ScheduledFutureAdapter@10140} state = {AtomicReference@9786} "RUNNING" lastHeartbeat = 0
RM会经过RPC再次回到TaskExecutor,其新执行序列以下:
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor heartbeatTargets.put(resourceID, heartbeatMonitor);
当注册完成后,其Receiver HM结构以下:
resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2" heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatTargets = {ConcurrentHashMap@10427} size = 1 {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" value = {HeartbeatMonitorImpl@10666} resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" heartbeatTarget = {TaskExecutor$1@10668} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} heartbeatTimeoutIntervalMs = 50000 futureTimeout = {ScheduledFutureAdapter@10992} state = {AtomicReference@10667} "RUNNING" lastHeartbeat = 0
其调用基本思路与以前相同,就是TM和JM之间互相注册一个表明对方的monitor:
JobLeaderListenerImpl ----> establishJobManagerConnection
消息到了JM中,作以下操做。
registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget // monitor the task manager as heartbeat target
在任务提交以后,咱们就进入了正常的心跳监控流程。咱们依然用 TM 和 RM进行演示。
咱们先给出一个流程图。
* 1. Run in Resouce Manager * * HeartbeatManagerSender in RM * | * +----> run@HeartbeatManagerSenderImpl * | //遍历全部监控的Monitor(Target),逐一在Target上调用requestHeartbeat * | * +----> requestHeartbeat@HeartbeatManagerSenderImpl * | // 将调用具体监控对象的自定义函数 * | // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); * | * +----> getHeartbeatListener().retrievePayload * | // 调用到TaskManagerHeartbeatListener@ResourceManager * | // 这里是return null;,由于RM不会是任何人的Receiver * | * +----> requestHeartbeat@HeartbeatTarget * | // 调用到Target这里,代码在ResourceManager这里,就是生成Target时候赋值的 * | * +----> taskExecutorGateway.heartbeatFromResourceManager * | // 会经过gateway RPC 调用到TM,这就是主动对TM发起了心跳请求 * | * ~~~~~~~~ 这里是 Akka RPC * 2. Run in Task Manager * 如今程序执行序列到达了TM, 主要是 1. 重置TM的Monitor线程; 2.返回一些负载信息; * * heartbeatFromResourceManager@TaskExecutor * | * +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); * | //开始要调用到 Receiver HM in Task Manager * | * +----> requestHeartbeat@HeartbeatManager in TM * | // 在Receiver HM in Task Manager 这里运行 * | * +----> reportHeartbeat@HeartbeatMonitor * | //reportHeartbeat : 记录发起请求的这个时间点,而后resetHeartbeatTimeout * | * +----> resetHeartbeatTimeout@HeartbeatMonitor * | // 若是Monitor状态依然是RUNNING,则取消以前设置的ScheduledFuture。 * | // 从新建立一个ScheduleFuture。由于若是不取消,则以前那个ScheduleFuture运行时 * | // 会调用HeartbeatMonitorImpl.run函数,run直接compareAndSet后,通知目标函数 * | // 目前已经超时,即调用heartbeatListener.notifyHeartbeatTimeout。 * | // 这里表明 JM 状态正常。 * | * +----> heartbeatListener.reportPayload * | // 把Target节点的最新的heartbeatPayload通知给heartbeatListener。 * | // heartbeatListerner是外部传入的,它根据所拥有的节点的心跳记录作监听管理。 * | * +----> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); * | * | * +----> retrievePayload@ResourceManagerHeartbeatListener in TM * | // heartbeatTarget.receiveHeartbeat参数调用的 * | * +----> return new TaskExecutorHeartbeatPayload * | * | * +----> receiveHeartbeat in TM * | // 回到 heartbeatTarget.receiveHeartbeat,这就是TM生成Target的时候的自定义函数 * | // 就是响应一个心跳消息回给RM * | * +----> resourceManagerGateway.heartbeatFromTaskManager * | // 会经过gateway RPC 调用到 ResourcManager * | * ~~~~~~~~ 这里是 Akka RPC * 3. Run in Resouce Manager * 如今程序回到了RM, 主要是 1.重置RM的Monitor线程;2. 上报收到TaskExecutor的负载信息 * * heartbeatFromTaskManager in RM * | * | * +----> taskManagerHeartbeatManager.receiveHeartbeat * | // 这是个Sender HM * | * +----> HeartbeatManagerImpl.receiveHeartbeat * | * | * +----> HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin); * | * | * +----> heartbeatMonitor.reportHeartbeat(); * | // 这里就是重置RM 这里对应的Monitor。在reportHeartbeat重置 JM monitor线程的触发,即cancelTimeout取消注册时候的超时定时任务,而且注册下一个超时检测futureTimeout;这表明TM正常执行。 * | * +----> heartbeatListener.reportPayload * | //把Target节点的最新的heartbeatPayload通知给 TaskManagerHeartbeatListener。heartbeatListerner是外部传入的,它根据所拥有的节点的心跳记录作监听管理。 * | * +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport()); * | // TaskManagerHeartbeatListener中调用,上报收到TaskExecutor的负载信息 * |
下面是具体文字描述。
心跳机制是由Sender主动发起的。这里就是 ResourceManager 的HeartbeatManagerSenderImpl中定时schedual调用,这里会遍历全部监控的Monitor(Target),逐一在Target上调用requestHeartbeat。
// HeartbeatManagerSenderImpl中的代码 @Override public void run() { if (!stopped) { for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { // 这里向被监控对象节点发起一次心跳请求,载荷是heartbeatPayLoad,要求被监控对象回应心跳 requestHeartbeat(heartbeatMonitor); } getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); } } } // 运行时候的变量 this = {HeartbeatManagerSenderImpl@9037} heartbeatPeriod = 10000 heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95" heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} // 调用栈以下 requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager) requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager) requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat) run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat) call:511, Executors$RunnableAdapter (java.util.concurrent) run$$$capture:266, FutureTask (java.util.concurrent) run:-1, FutureTask (java.util.concurrent)
具体监控对象 Target 会调用自定义的requestHeartbeat。
HeartbeatManagerSenderImpl private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // 这里就是具体监控对象 heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); } heartbeatTarget = {ResourceManager$2@10688} taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334" this$0 = {StandaloneResourceManager@9458}
请注意,每个Target都是由ResourceManager生成的。ResourceManager以前注册成为Monitor时候就注册了这个HeartbeatTarget。
这个HeartbeatTarget的定义以下,两个函数是:
receiveHeartbeat :这个是空,由于RM没有本身的Sender。
requestHeartbeat :这个针对TM,就是调用TM的heartbeatFromResourceManager,固然是经过RPC调用。
会调用到ResourceManager定义的函数requestHeartbeat,而requestHeartbeat会经过gateway调用到TM,这就是主动对TM发起了心跳请求。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the ResourceManager will always send heartbeat requests to the TaskManager } @Override public void requestHeartbeat(ResourceID resourceID, Void payload) { //就是调用到这里 taskExecutorGateway.heartbeatFromResourceManager(resourceID); } });
经过taskExecutorGateway。心跳程序执行就经过RPC从RM跳跃到了TM。
taskExecutorGateway.heartbeatFromResourceManager
的意义就是:经过RPC调用回到TaskExecutor。这个是在TaskExecutorGateway就定义好的。
// TaskExecutor RPC gateway interface. public interface TaskExecutorGateway extends RpcGateway
TaskExecutor实现了TaskExecutorGateway,因此具体在TaskExecutor内部实现了接口函数。
@Override public void heartbeatFromResourceManager(ResourceID resourceID) { //调用到了这里 ........... resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); }
TM中,resourceManagerHeartbeatManager 定义以下。
/** The heartbeat manager for resource manager in the task manager. */ private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
因此下面就是执行TM中的Receiver HM。在这个过程当中有两个处理步骤:
具体是调用 requestHeartbeat@HeartbeatManager。在其中会
@Override public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}.", requestOrigin); final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); if (heartbeatTarget != null) { if (heartbeatPayload != null) { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); } } }
最后会经过resourceManagerGateway.heartbeatFromTaskManager
调用到 ResourcManager。
JobMaster在接收到rpc请求后调用其heartbeatFromTaskManager方法,会调用taskManagerHeartbeatManager的receiveHeartbeat方法,在这个过程当中一样有两个处理步骤:
至此一次完成心跳过程已经完成,会根据heartbeatInterval执行下一次心跳。
首先,在HeartbeatMonitorImpl中,若是超时,会调用Listener。
public void run() { // The heartbeat has timed out if we're in state running if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { heartbeatListener.notifyHeartbeatTimeout(resourceID); } }
这就来到了ResourceManagerHeartbeatListener,会尝试再次链接RM。
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceId) { validateRunsInMainThread(); // first check whether the timeout is still valid if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) { reconnectToResourceManager(new TaskManagerException( String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))); } else { ..... } }
RM就直接简单粗暴,关闭链接。
private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceID) { validateRunsInMainThread(); closeTaskManagerConnection( resourceID, new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); } }
心跳机制咱们讲解完了,可是咱们最初提到的异常应该如何解决呢?在程序最开始生成环境变量时候,经过设置环境变量的配置便可搞定:
Configuration conf = new Configuration(); conf.setString("heartbeat.timeout", "18000000"); final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);