PyFlink 教程:PyFlink DataStream API - state & timer

简介:介绍如何在 Python DataStream API 中使用 state & timer 功能。

1、背景

Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能。其中,PyFlink 模块在该版本中也新增了若干重要功能,好比支持了 state、自定义 window、row-based operation 等。随着这些功能的引入,PyFlink 功能已经日趋完善,用户可使用 Python 语言完成绝大多数类型Flink做业的开发。接下来,咱们详细介绍如何在 Python DataStream API 中使用 state & timer 功能。python

2、state 功能介绍

做为流计算引擎,state 是 Flink 中最核心的功能之一。sql

  • 在 1.12 中,Python DataStream API 尚不支持 state,用户使用 Python DataStream API 只能实现一些简单的、不须要使用 state 的应用;
  • 而在 1.13 中,Python DataStream API 支持了此项重要功能。

state 使用示例

以下是一个简单的示例,说明如何在 Python DataStream API 做业中使用 state:编程

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.LONG())
        # 定义value state
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        new_cnt = cnt + 1
        self.cnt_state.update(new_cnt)
        return value[0], new_cnt


def state_access_demo():
    # 1. 建立 StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. 建立数据源
    seq_num_source = NumberSequenceSource(1, 100)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. 定义执行逻辑
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. 将打印结果数据
    ds.print()

    # 5. 执行做业
    env.execute()


if __name__ == '__main__':
    state_access_demo()

在上面的例子中,咱们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt\_state” 的 ValueState,用于记录每个 key 出现的次数。缓存

说明:架构

  • 除了 ValueState 以外,Python DataStream API 还支持 ListState、MapState、ReducingState,以及 AggregatingState;
  • 定义 state 的 StateDescriptor 时,须要声明 state 中所存储的数据的类型(TypeInformation)。另外须要注意的是,当前 TypeInformation 字段并未被使用,默认使用 pickle 进行序列化,所以建议将 TypeInformation 字段定义为 Types.PICKLED\_BYTE\_ARRAY() 类型,与实际所使用的序列化器相匹配。这样的话,当后续版本支持使用 TypeInformation 以后,能够保持后向兼容性;
  • state 除了能够在 KeyedStream 的 map 操做中使用,还能够在其它操做中使用;除此以外,还能够在链接流中使用 state,好比:
ds1 = ...  # type DataStream
ds2 = ...  # type DataStream
ds1.connect(ds2) \
    .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
    .map(MyCoMapFunction())  # 能够在MyCoMapFunction中使用state

可使用 state 的 API 列表以下:app

操做 自定义函数
KeyedStream map MapFunction
flat\_map FlatMapFunction
reduce ReduceFunction
filter FilterFunction
process KeyedProcessFunction
ConnectedStreams map CoMapFunction
flat\_map CoFlatMapFunction
process KeyedCoProcessFunction
WindowedStream apply WindowFunction
process ProcessWindowFunction

state 工做原理

img

上图是 PyFlink 中,state 工做原理的架构图。从图中咱们能够看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数须要访问 state 时,会经过远程调用的方式,访问 state backend。机器学习

咱们知道,远程调用的开销是很是大的,为了提高 state 读写的性能,PyFlink 针对 state 读写作了如下几个方面的优化工做:异步

  • Lazy Read:编程语言

    对于包含多个 entry 的 state,好比 MapState,当遍历 state 时,state 数据并不会一次性所有读取到 Python worker 中,只有当真正须要访问时,才从 state backend 读取。函数

  • Async Write:

    当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样作能够避免每次 state 更新操做都访问远端的 state backend;同时,针对同一个 key 的屡次更新操做,能够合并执行,尽可能避免无效的 state 更新。

  • LRU cache:

    在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 能够避免每次读写操做,都访问远端的 state backend,对于有热点 key 的场景,能够极大提高 state 读写性能。

  • Flush on Checkpoint:

    为了保证 checkpoint 语义的正确性,当 Java 算子须要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。

其中 LRU cache 能够细分为二级,以下图所示:

img

说明:

  • 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着当前 Python worker 进程中全部缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着当前 Python worker 进程中全部建立的 state 对象。
  • 一级 cache 位于每个 state 对象内,在 state 对象中缓存着该 state 对象已经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。

工做流程:

  • 当在 Python UDF 中,建立一个 state 对象时,首先会查看当前 key 所对应的 state 对象是否已经存在(在二级 cache 中的 “Global Write Cache” 中查找),若是存在,则返回对应的 state 对象;若是不存在,则建立新的 state 对象,并存入 “Global Write Cache”;
  • state 读取:当在 Python UDF 中,读取 state 对象时,若是待读取的 state 数据已经存在(一级 cache),好比对于 MapState,待读取的 map key/map value 已经存在,则直接返回对应的 map key/map value;不然,访问二级 cache,若是二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
  • state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象内部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。

state 性能调优

经过前一节的介绍,咱们知道 PyFlink 使用了多种优化手段,用于提高 state 读写的性能,这些优化行为能够经过如下参数配置:

配置 说明
python.state.cache-size Python worker 中读 cache 以及写 cache 的大小。(二级 cache)须要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。
python.map-state.iterate-response-batch-size 当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。
python.map-state.read-cache-size 一个 MapState 的读 cache 中最大容许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会经过 LRU 策略从读 cache 中删除最近最少访问过的 entry。
python.map-state.write-cache-size 一个 MapState 的写 cache 中最大容许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下全部待更新 state 数据写回远端的 state backend。

须要注意的是,state 读写的性能不只取决于以上参数,还受其它因素的影响,好比:

  • 输入数据中 key 的分布:

    输入数据的 key 越分散,读 cache 命中的几率越低,则性能越差。

  • Python UDF 中 state 读写次数:

    state 读写可能涉及到读写远端的 state backend,应该尽可能优化 Python UDF 的实现,减小没必要要的 state 读写。

  • checkpoint interval:

    为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将全部缓存的待更新 state 数据,写回 state backend。若是配置的 checkpoint interval 太小,则可能并不能有效减小 Python worker 写回 state backend 的数据量。

  • bundle size / bundle time:

    当前 Python 算子会将输入数据划分红多个批次,发送给 Python worker 执行。当一个批次的数据处理完以后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 相似,该行为也可能会影响 state 写性能。批次的大小能够经过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。

3、timer 功能介绍

timer 使用示例

除了 state 以外,用户还能够在 Python DataStream API 中使用定时器 timer。

import datetime

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


class CountWithTimeoutFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(ValueStateDescriptor(
            "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = Row(value.f1, 0, 0)

        # update the state's count
        current[1] += 1

        # set the state's timestamp to the record's assigned event time timestamp
        current[2] = ctx.timestamp()

        # write the state back
        self.state.update(current)

        # schedule the next timer 60 seconds from the current event time
        ctx.timer_service().register_event_time_timer(current[2] + 60000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        # get the state for the key that scheduled the timer
        result = self.state.value()

        # check if this is an outdated timer or the latest timer
        if timestamp == result[2] + 60000:
            # emit the state on timeout
            yield result[0], result[1]


class MyTimestampAssigner(TimestampAssigner):

    def __init__(self):
        self.epoch = datetime.datetime.utcfromtimestamp(0)

    def extract_timestamp(self, value, record_timestamp) -> int:
        return int((value[0] - self.epoch).total_seconds() * 1000)


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    t_env.execute_sql("""
            CREATE TABLE my_source (
              a TIMESTAMP(3),
              b VARCHAR,
              c VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
        """)

    stream = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
    watermarked_stream = stream.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_monotonous_timestamps()
                         .with_timestamp_assigner(MyTimestampAssigner()))

    # apply the process function onto a keyed stream
    watermarked_stream.key_by(lambda value: value[1])\
        .process(CountWithTimeoutFunction()) \
        .print()

    env.execute()

在上述示例中,咱们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每个 key 出现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其出现次数,发送到下游节点。

除了 event time timer 以外,用户还可使用 processing time timer。

timer 工做原理

timer 的工做流程是这样的:

  • 与 state 访问使用单独的通讯信道不一样,当用户注册 timer 以后,注册消息经过数据通道发送到 Java 算子;
  • Java 算子收到 timer 注册消息以后,首先检查待注册 timer 的触发时间,若是已经超过当前时间,则直接触发;不然的话,将 timer 注册到 Java 算子的 timer service 中;
  • 当 timer 触发以后,触发消息经过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on\_timer 方法。

须要注意的是:因为 timer 注册消息以及触发消息经过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会形成在某些场景下,timer 的触发可能没有那么及时。好比当用户注册了一个 processing time timer,当 timer 触发以后,触发消息经过数据通道传输到 Python UDF 时,可能已是几秒中以后了。

4、总结

在这篇文章中,咱们主要介绍了如何在 Python DataStream API 做业中使用 state & timer,state & timer 的工做原理以及如何进行性能调优。接下来,咱们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深刻了解 PyFlink 中各类功能、应用场景以及最佳实践等。

另外,阿里云实时计算生态团队长期招聘优秀大数据人才(包括实习 + 社招),咱们的工做包括:

  • 实时机器学习:支持机器学习场景下实时特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的标准,推进例如搜索、推荐、广告、风控等场景的全面实时化;
  • 大数据 + AI 一体化:包括编程语言一体化 (PyFlink 相关工做),执行引擎集成化 (TF on Flink),工做流及管理一体化(Flink AI Flow)。

若是你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com

此外,也欢迎你们加入 “PyFlink交流群”,交流 PyFlink 相关的问题。

本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。