来源 | eBay Unified Monitoring Platform 翻译 | 顾欣怡html
Sherlock.IO 是 eBay 现有的监控平台,天天要处理上百亿条日志、事件和指标。Flink Streaming job 实时处理系统用于处理其中的日志和事件。本文将结合监控系统 Flink 的现状,具体讲述 Flink 在监控系统上的实践和应用,但愿给同业人员一些借鉴和启发。算法
##一. 监控系统 Flink 的现状apache
eBay 的监控平台 Sherlock.IO 天天处理着上百亿条日志(log),事件(event)和指标(metric)。经过构建 Flink Streaming job 实时处理系统,监控团队可以及时将日志和事件的处理结果反馈给用户。当前,监控团队维护着 8 个 Flink 集群,最大的集群规模达到上千个 TaskManager,总共运行着上百个做业(job),一些做业已经稳定运行了半年以上。微信
为了让用户和管理员可以更加快捷地建立Flink做业并调整参数,监控团队在 Flink 上搭建了一套元数据微服务(metadata service),该服务可以用Json来描述一个做业的 DAG,且相同的 DAG 共用同一个做业,可以更加方便地建立做业,无需调用 Flink API。Sherlock.IO 流处理总体的架构如图1所示。网络
图1 Sherlock.IO 流处理总体架构架构
目前,用这套元数据微服务建立的做业仅支持以 Kafka 做为数据源,只要数据接入到 Kafka,用户就能够定义 Capability 来处理逻辑从而经过 Flink Streaming 处理数据。框架
元数据微服务框架如图 2 所示,最上层是元数据微服务提供的 Restful API, 用户经过调用 API 来描述和提交做业。描述做业的元数据包含三个部分:Resource,Capability 和 Policy。Flink 适配器(Adaptor)链接了 Flink Streaming API 和元数据微服务 API,且会根据元数据微服务描述的做业调用 Flink Streaming API 来建立做业,从而屏蔽 Flink StreamAPI。运维
所以,用户不用了解 Flink Streaming API 就能够建立 Flink 做业。将来若是须要迁移到其余的流处理框架,只要增长一个适配器,就能够将现有的做业迁移到新的流处理框架上。微服务
图2 元数据微服务框架工具
Capability 定义了做业的 DAG 以及每一个算子(Operator)所用的 Class,图 3 是事件处理(eventProcess) Capability,它最终会生成如图 4 的 DAG。事件处理 Capability 先从 Kafka 读出数据,再写到 Elasticsearch 中。该 Capability 将该做业命名为“eventProcess”,并定义其并行度为“5”,其算子为“EventEsIndexSinkCapability”, 其数据流为“Source –> sink”。
图3 eventESSink Capability
图4 生成的Flink做业
每一个命名空间(Namespace)须要定义一个或多个 Policy,每一个 Policy 指定了相应的 Capability,即指定了用哪一套 DAG 来运行这个 Policy。Policy 还定义了这个做业的相关配置,例如从哪一个 Kafka topic 中读取数据,写到 ElasticSearch 的哪一个索引(Index)中,中间是否要跳过某些算子等等。
其次,Policy 还能做为一个简易的过滤器(Filter),能够经过配置 Jexl 表达式过滤掉一些不须要的数据,提升做业的吞吐量。
另外,咱们还实现了 Zookeeper 定时更新的机制,使得 Policy 修改后再也不须要重启做业,只要是在更新时间间隔内,该命名空间的 Policy 修改就会被自动应用到做业上。图 5 是命名空间为 paas 的 Policy 示例。
图5 paas alertESSink Policy
Resource 定义了某个命名空间所须要的资源,好比 Flink 集群, Kafka broker,ES 集群等等。咱们有多个 Flink 集群和 ES 集群,经过 Resource 配置,做业能够知道某个命名空间的日志应该写到哪一个 ES 集群,并能够判断该命名空间的数据应该从哪一个 Kafka 集群读取。
为了减小做业数量,咱们可让相同的 DAG 复用同一个做业。咱们先给不一样的 Policy 指定相同的 Capability,在该 Capability 资源足够的状况下,这些 Policy 就会被调度到同一个做业上。
以 SQL 的 Capability 为例,每一个 Policy 的 SQL 语句不尽相同,若是为每一个 Policy 都建立一个做业, Job Manager 的开销就会很大,且很差管理。所以,咱们能够为 SQL Capability 配置 20 个 Slot,每一个 Policy 占用一个 Slot。那么该 Capability 生成的做业就能够运行 20 个 Policy。
做业运行时,从 Source 读进来的数据会被打上相应 Policy 的标签,并执行该 Policy 定义的 SQL 语句,从而实现不一样 Policy 共享同一个做业,大大减小了做业的数量。
用共享做业还有一个好处:若是多个命名空间的数据在一个 Kafka topic 里,那么只要读一遍数据便可,不用每一个命名空间都读一次 topic 再过滤,这样就大大提升了处理的效率。
了解元数据驱动后,让咱们来看看能够经过哪些方法实现 Flink 做业的而优化和监控。
在 Flink 集群的运维过程当中,咱们很难监控做业的运行状况。即便开启了检查点(checkpoint),咱们也没法肯定是否丢失数据或丢失了多少数据。所以,咱们为每一个做业注入了 Heartbeat 以监控其运行状况。
Heartbeat 就像 Flink 中用来监控延迟的“LatencyMarker”同样,它会流过每一个做业的管道。但与 LatencyMarker 不一样的是,当 Heartbeat 遇到 DAG 的分支时,它会分裂并流向每一个分支,而不像 LatencyMarker 那样随机流向某一个分支。另外一个不一样点在于 Heartbeat 不是由 Flink 自身产生,而是由元数据微服务定时产生,然后由每一个做业消费。
如图 4 所示,每一个做业在启动的时候会默认加一个 Heartbeat 的数据源。Heartbeat 流入每一个做业后,会随数据流一块儿通过每一个节点,在每一个节点上打上当前节点的标签,而后跳过该节点的处理逻辑流向下个节点。直到 Heartbeat 流到最后一个节点时,它会以指标(Metric)的形式发送到 Sherlock.IO(eBay 监控平台)。
该指标包含了 Heartbeat 产生的时间,流入做业的时间以及到达每一个节点的时间。经过这个指标,咱们能够判断该做业在读取 kafka 时是否延时,以及一条数据被整个管道处理所用的时间和每一个节点处理数据所用的时间,进而判断该做业的性能瓶颈。
因为 Heartbeat 是定时发送的,所以每一个做业收到的 Heartbeat 个数应该一致。若最后发出的指标个数与指望不一致,则能够进一步判断是否有数据丢失。
图 6 描述了某 Flink 做业中的数据流以及 Heartbeat 的运行状态:
图6 Heartbeat在做业中的运行过程
有了 Heartbeat,咱们就能够用来定义集群的可用性。首先,咱们须要先定义在什么状况下属于不可用的:
当内存不足(OutofMemory)或代码运行错误时,做业就可能会意外重启。咱们认为重启过程当中形成的数据丢失是不可用的状况之一。所以咱们的目标之一是让 Flink 做业可以长时间稳定运行。
有时由于基础设施的问题致使物理机或者容器没启动起来,或是在 Flink 做业发生重启时因为 Slot 不够而没法启动,或者是由于 Flink 做业的重启次数已经超过了最大重启次数(rest.retry.max-attempts), Flink 做业就会停止。此时须要人工干预才能将做业从新启动起来。
咱们认为 Flink 做业停止时,也是不可用的状况之一。
发生这种状况,通常是由于遇到了反压(BackPressure)。形成反压的缘由有不少种,好比上游的流量过大,或者是中间某个算子的处理能力不够,或者是下游存储节点遇到性能瓶颈等等。虽然短期内的反压不会形成数据丢失,但它会影响数据的实时性,最明显的变化是延迟这个指标会变大。
咱们认为反压发生时是不可用的状况之一。
针对以上三种状况,咱们均可以用 Heartbeat 来监控,并计算可用性。好比第一种状况,若是做业重启时发生了数据丢失,那么相应的那段管道的 Heartbeat 也会丢失,从而咱们能够监测出是否有数据丢失以及粗粒度地估算数据丢了多少。对于第二种状况,看成业停止时,HeartBeat 也不会被处理,所以能够很快发现做业中止运行并让 on-call 及时干预。第三种状况当反压发生时,HeartBeat 也会被阻塞在发生反压的上游,所以 on-call 也能够很快地发现反压发生并进行人工干预。
综上,Heartbeat 能够很快监测出 Flink 做业的运行状况。那么,如何评估可用性呢?因为 Heartbeat 是定时发生的,默认状况下咱们设置每 10 秒发一次。1 分钟内咱们指望每一个做业的每条管道可以发出 6 个带有做业信息的 heartbeat,那么天天就能够收到 8640 个 Heartbeat。
所以,一个做业的可用性能够定义为:
Slot 是 Flink 运行做业的最小单位[1],每一个 TaskManager 能够分配一个至多个 Slot(通常分配的个数为该 TaskManager 的 CPU 数)。根据 Flink 做业的并行度,一个做业能够分配到多个 TaskManager 上,而一个 TaskManager 也可能运行着多个做业。然而,一个 TaskManager 就是一个 JVM,当多个做业分配到一个 TaskManager 上时,就会有抢夺资源的状况发生。
例如,我一个 TaskManager 分配了 3 个 Slot(3 个 CPU)和 8G 堆内存。当 JobManager 调度做业的时候,有可能将 3 个不一样做业的线程调度到该 TaskManager 上,那么这 3 个做业就会同时抢夺 CPU 和内存的资源。当其中一个做业特别耗 CPU 或内存的时候,就会影响其余两个做业。
在这种状况下,咱们经过配置 Flink 能够实现做业的隔离,如图 7 所示:
图7 Flink 做业隔离先后的调度图
经过配置:
“taskmanager.numberOfTaskSlots: 1”:能够设置每一个TaskManager只有一个Slot; “cpu_period”和“cpu_quota”:能够限定每一个TaskManager的CPU个数 “taskmanager.heap.mb”能够配置每一个TaskManager的JVM的内存大小。
经过以上配置,能够限定每一个 TaskManager 独占 CPU 和内存的资源,且不会多个做业抢占,实现做业之间的隔离。
咱们运维 Flink 集群的时候发现,出现最多的问题就是反压。在 3.2 中提到过,发生反压的缘由有不少种,但不管什么缘由,数据最终都会被积压在发生反压上游的算子的本地缓冲区(localBuffer)中。
咱们知道,每个 TaskManager 有一个本地缓冲池, 每个算子数据进来后会把数据填充到本地缓冲池中,数据从这个算子出去后会回收这块内存。当被反压后,数据发不出去,本地缓冲池内存就没法释放,致使一直请求缓冲区(requestBuffer)。
因为 Heartbeat 只能监控出是否发生了反压,但没法定位到是哪一个算子出了问题,所以咱们定时地将每一个算子的 StackTrace 打印出来,当发生反压时,经过 StackTrace 就能够知道是哪一个算子的瓶颈。
如图8所示,咱们能够清晰地看到发生反压的 Flink 做业及其所在的 Taskmanager。再经过 Thread Dump,咱们就能够定位到代码的问题。
图8 发生反压的StackTrace (点击观看大图)
Flink 自己提供了不少有用的指标[2]来监控 Flink 做业的运行状况,在此基础上咱们还加了一些业务上的指标。除此以外,咱们还使用了如下工具监控 Flink 做业。
Flink 的 History server[3]能够查询已完成做业的状态和指标。好比一个做业的重启次数、它运行的时间。咱们经常用它找出运行不正常的做业。好比,咱们能够经过 History server 的 attempt 指标知道每一个做业重启的次数,从而快速去现场找到重启的缘由,避免下次再发生。
虽然 Flink 有 HA 的模式,但在极端状况下,例如整个集群出现问题时,须要 on-call 即时发觉并人工干预。咱们在元数据微服务中保存了最后一次提交做业成功的元数据,它记录了在每一个 Flink 集群上应该运行哪些做业。守护线程(Daemon thread)会每分钟去比较这个元数据和 Flink 上运行的做业,若发现 JobManager 连不通或者有做业运行不一致则马上发出告警(Alert)通知 on-call。
下面介绍几个已经运行在监控系统上的 Flink 流处理系统的应用:
当前监控团队是基于 Flink Streaming 作事件告警(Event alerting),咱们定义了一个告警算子 EventAlertingCapability,该 Capability 能够处理每一个 Policy 自定义的规则。如图 9 定义的一条性能监控规则:
该规则的含义是当性能检测器的应用为“r1rover”, 主机以“r1rover”开头,且数值大于 90 时,就触发告警。且生成的告警会发送到指定的 Kafka topic 中供下游继续处理。
图9 Single-Threshold1 Policy (点击查看大图)
Eventzon 就像 eBay 的事件中心,它收集了从各个应用,框架,基础架构发过来的事件,最后经过监控团队的 Flink Streaming 实时生成告警。因为各个事件的数据源不一样,它们的元数据也不一样,所以没法用一条统一的规则来描述它。
咱们专门定义了一套做业来处理 Eventzon 的事件,它包含了多个 Capability,好比 Filter Capability,用来过滤非法的或者不符合条件的事件; 又好比 Deduplicate Capability,能够用来去除重复的事件。Eventzon 的全部事件通过一整套做业后,会生成有效的告警,并根据通知机制经过 E-mail、Slack 或 Pagerduty 发给相关团队。
Netmon 的全称为 Network Monitoring, 即网络监控,它能够用来监控整个 eBay 网络设备的健康状态。它的数据源来自 eBay 的交换机,路由器等网络设备的日志。Netmon 的做用是根据这些日志找出一些特定的信息,每每是一些错误的日志,以此来生成告警。
eBay 的每一台设备都要“登记造册”,每台设备将日志发过来后,咱们经过 EnrichCapability 从“册子”中查询这台设备的信息,并把相关信息好比 IP 地址,所在的数据中心,所在的机架等填充到日志信息中做为事件保存。当设备产生一些特定的错误日志时, 它会被相应的规则匹配而后生成告警,该告警会被 EventProcess Capability 保存到 Elasticsearch 中实时显示到 Netmon 的监控平台(dashboard)上。有时由于网络抖动致使一些短暂的错误发生,但系统过一下子就会自动恢复。
当上述状况发生时,Netmon 会有相应的规则将发生在网络抖动时生成的告警标记为“已解决”(Resolved)。对于一些必须人工干预的告警,运维人员能够经过网络监控平台(Netmon dashboard)手动点击“已解决”,完成该告警的生命周期。
eBay 的监控团队但愿能根据用户提供的指标、事件和日志以及相应的告警规则实时告警用户。Flink Streaming 可以提供低延时的处理从而可以达到咱们低延时的要求,而且它适合比较复杂的处理逻辑。
然而在运维 Flink 的过程当中,咱们也发现了因为做业重启等缘由致使误报少报告警的状况发生,从而误导客户。所以从此咱们会在 Flink 的稳定性和高可用性上投入更多。咱们也但愿在监控指标、日志上可以集成一些复杂的 AI 算法,从而可以生成更加有效精确的告警,成为运维人员的一把利器。
参考文献: [1]ci.apache.org/projects/fl… [2]ci.apache.org/projects/fl… [3]ci.apache.org/projects/fl…
▼ Flink 社区推荐 ▼
史上超强阵容,Flink Forward Asia 2019 你报名了吗?
关注 Flink 官方社区微信公众号,了解更多 Flink 资讯!