Zookeeper connection loss leads to Flink job restart

Flink可使用zookeeper来进行ha,而通常咱们都会使用zookeeper的高级api架构curator来对zk进行通信。在curator中引入了状态的概念,包括connected,reconnected,suspeneded,lost与read_only,其中suspended是个有意思的状态,当由于网络抖动、机器繁忙、zk集群短暂无响应,都会致使curator将状态置为suspended.java

而Flink对suspended采起了很是谨慎的处理,就是发现是suspended,则取消全部做业,进行restart,显得未免有些太敏感了,其实这个时候每每zk也是ok的,相应的jm也是leader都没有问题。api

好,咱们再顺一下:网络

在发生zk connection loss的状况下,curator会设置suspended状态,在此状态下,curator会释放leader,flink在发现notleader以后则会revokeLeadership,进而致使dispatcher会cancel掉全部的job,cancel的过程当中flink会主动抛出异常。session

 

虽然这样作没什么大的影响,由于其实若是connection很快恢复,做业也会很快被拉起,没有大碍,但看起来老是很差,zk链接随便的一个扰动,均可能致使job重启,因此就想把它改动。架构

 

方案一:测试

在flink的ZooKeeperUtils.java经过CuratorFrameworkFactory来构造CuratorFramework时,经过connectionStateErrorPolicy将ConnectionStateErrorPolicy从StandardConnectionStateErrorPolicy更新为SessionConnectionStateErrorPolicy,前者将suspended和lost都做为error,后者只是将lost做为error,而只有发生error的时候才会取消leadership,因此如此设置以后,在进入suspended状态时,不在发生leadership的取消和从新选举。spa

优势:从总体的状态转换上进行了控制,优雅。rest

缺点:目前flink所引用的curator的版本为2.12.0,不支持设置policy,须要更新curator版本号,是否会带来其余问题,不可知。ip

测试:成功。io

更改curator的版本为4.2.0,提交做业,restart zk,job没有重启,checkpoint正常进行。

 

 

方案二:

在flink内部,在代码ZooKeeperLeaderElectionService.java中的notLeader方法中,在收到notleader的通知的时候,根据当前的状态是不是suspended进行相应的处理。

优势:不对flink的总体形成影响,更改在局部范围内可控。

缺点:因为curator对suspended的处理依旧,因此从curator的层面仍是会发生取消leadership而后从新进行选举的状况,虽然这一切都没必要要。

测试:失败

1.原先预计的是在notleader方法中,若是发现当前状态是suspended,就不去执行revokeLeadership方法,但notleader方法和suspended状态的获取分别是在两个回调方法中触发的,通过测试,没法保证两个回调的执行顺序,即有可能notleader方法已经触发,可是suspended状态尚未触发。

2.若是只是修改notleader方法,即便修改为功,仍是会触发isleader方法,在isleader方法中,若是不修改,仍是会触发原有做业的取消和从新提交,因此这里也要改,改为从新连接以后这里即便被通知isleader也不会去给dispatcher进行grantLeadership,但又不能直接这么操做,还须要判断是否本身已是leader,但惋惜的是,在发生suspended的时候,curator里面已经将leadership取消掉了,因此若是在这里加上判断是connected状态而且不是leader而后不去grantleadership,会看起来很奇怪。

总而言之,若是不动curator的逻辑,只是在flink里改,这里的逻辑就会被改的难以理解,而且还没法成功。

 

目前的方案应对的场景是zk connection的短期抖动,若是发生zk connection的长时间不可用,则tm和jm都会失败,这个也是应有之义。

 

另,

在flink中对curator的suspended状态起做用的还有一个地方,在ZooKeeperCheckpointIDCounter.java中有对suspended的判断,若是以前是suspended或者Lost,则flink就不会去zk上存取checkpoint的信息了。这里感受是个坑,也须要改对suspended的策略。

 

外一篇,

zookeeper能够设置session timeout时间,可是不是你随便设置就会起做用,会有一个判断的过程。

SessionTimeOut的协商以下:

  • 状况1: 配置文件配置了maxSessionTimeOut和minSessionTimeOut

最终SessionTimeOut,必须在minSessionTimeOut和maxSessionTimeOut区间里,若是跨越上下界,则以跨越的上届或下界为准。

  • 状况2:配置文件没有配置maxSessionTimeOut和minSessionTimeOut

maxSessionTimeout没配置则 maxSessionTimeOut设置为 20 * tickTime

minSessionTimeOut没配置则 minSessionTimeOut设置为 2 * tickTime

也就是默认状况下, SessionTimeOut的合法范围为 4秒~40秒,默认配置中tickTime为2秒。

相关文章
相关标签/搜索