Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能。其中,PyFlink 模块在该版本中也新增了若干重要功能,好比支持了 state、自定义 window、row-based operation 等。随着这些功能的引入,PyFlink 功能已经日趋完善,用户可使用 Python 语言完成绝大多数类型Flink做业的开发。接下来,咱们详细介绍如何在 Python DataStream API 中使用 state & timer 功能。python
做为流计算引擎,state 是 Flink 中最核心的功能之一。sql
以下是一个简单的示例,说明如何在 Python DataStream API 做业中使用 state:编程
from pyflink.common import WatermarkStrategy, Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction from pyflink.datastream.state import ValueStateDescriptor class MyMapFunction(MapFunction): def open(self, runtime_context: RuntimeContext): state_desc = ValueStateDescriptor('cnt', Types.LONG()) # 定义value state self.cnt_state = runtime_context.get_state(state_desc) def map(self, value): cnt = self.cnt_state.value() if cnt is None: cnt = 0 new_cnt = cnt + 1 self.cnt_state.update(new_cnt) return value[0], new_cnt def state_access_demo(): # 1. 建立 StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # 2. 建立数据源 seq_num_source = NumberSequenceSource(1, 100) ds = env.from_source( source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='seq_num_source', type_info=Types.LONG()) # 3. 定义执行逻辑 ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \ .key_by(lambda a: a[0]) \ .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()])) # 4. 将打印结果数据 ds.print() # 5. 执行做业 env.execute() if __name__ == '__main__': state_access_demo()
在上面的例子中,咱们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt_state” 的 ValueState,用于记录每个 key 出现的次数。缓存
说明:架构
ds1 = ... # type DataStream ds2 = ... # type DataStream ds1.connect(ds2) \ .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \ .map(MyCoMapFunction()) # 能够在MyCoMapFunction中使用state
可使用 state 的 API 列表以下:app
操做 | 自定义函数 | |
---|---|---|
KeyedStream | map | MapFunction |
flat_map | FlatMapFunction | |
reduce | ReduceFunction | |
filter | FilterFunction | |
process | KeyedProcessFunction | |
ConnectedStreams | map | CoMapFunction |
flat_map | CoFlatMapFunction | |
process | KeyedCoProcessFunction | |
WindowedStream | apply | WindowFunction |
process | ProcessWindowFunction |
上图是 PyFlink 中,state 工做原理的架构图。从图中咱们能够看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数须要访问 state 时,会经过远程调用的方式,访问 state backend。机器学习
咱们知道,远程调用的开销是很是大的,为了提高 state 读写的性能,PyFlink 针对 state 读写作了如下几个方面的优化工做:异步
Lazy Read:编程语言
对于包含多个 entry 的 state,好比 MapState,当遍历 state 时,state 数据并不会一次性所有读取到 Python worker 中,只有当真正须要访问时,才从 state backend 读取。函数
Async Write:
当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样作能够避免每次 state 更新操做都访问远端的 state backend;同时,针对同一个 key 的屡次更新操做,能够合并执行,尽可能避免无效的 state 更新。
LRU cache:
在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 能够避免每次读写操做,都访问远端的 state backend,对于有热点 key 的场景,能够极大提高 state 读写性能。
Flush on Checkpoint:
为了保证 checkpoint 语义的正确性,当 Java 算子须要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。
其中 LRU cache 能够细分为二级,以下图所示:
说明:
工做流程:
经过前一节的介绍,咱们知道 PyFlink 使用了多种优化手段,用于提高 state 读写的性能,这些优化行为能够经过如下参数配置:
配置 | 说明 |
---|---|
python.state.cache-size | Python worker 中读 cache 以及写 cache 的大小。(二级 cache)须要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。 |
python.map-state.iterate-response-batch-size | 当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。 |
python.map-state.read-cache-size | 一个 MapState 的读 cache 中最大容许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会经过 LRU 策略从读 cache 中删除最近最少访问过的 entry。 |
python.map-state.write-cache-size | 一个 MapState 的写 cache 中最大容许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下全部待更新 state 数据写回远端的 state backend。 |
须要注意的是,state 读写的性能不只取决于以上参数,还受其它因素的影响,好比:
输入数据中 key 的分布:
输入数据的 key 越分散,读 cache 命中的几率越低,则性能越差。
Python UDF 中 state 读写次数:
state 读写可能涉及到读写远端的 state backend,应该尽可能优化 Python UDF 的实现,减小没必要要的 state 读写。
checkpoint interval:
为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将全部缓存的待更新 state 数据,写回 state backend。若是配置的 checkpoint interval 太小,则可能并不能有效减小 Python worker 写回 state backend 的数据量。
bundle size / bundle time:
当前 Python 算子会将输入数据划分红多个批次,发送给 Python worker 执行。当一个批次的数据处理完以后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 相似,该行为也可能会影响 state 写性能。批次的大小能够经过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。
除了 state 以外,用户还能够在 Python DataStream API 中使用定时器 timer。
import datetime from pyflink.common import Row, WatermarkStrategy from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptor from pyflink.table import StreamTableEnvironment class CountWithTimeoutFunction(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): self.state = runtime_context.get_state(ValueStateDescriptor( "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()]))) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): # retrieve the current count current = self.state.value() if current is None: current = Row(value.f1, 0, 0) # update the state's count current[1] += 1 # set the state's timestamp to the record's assigned event time timestamp current[2] = ctx.timestamp() # write the state back self.state.update(current) # schedule the next timer 60 seconds from the current event time ctx.timer_service().register_event_time_timer(current[2] + 60000) def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): # get the state for the key that scheduled the timer result = self.state.value() # check if this is an outdated timer or the latest timer if timestamp == result[2] + 60000: # emit the state on timeout yield result[0], result[1] class MyTimestampAssigner(TimestampAssigner): def __init__(self): self.epoch = datetime.datetime.utcfromtimestamp(0) def extract_timestamp(self, value, record_timestamp) -> int: return int((value[0] - self.epoch).total_seconds() * 1000) if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE TABLE my_source ( a TIMESTAMP(3), b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ) """) stream = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) # apply the process function onto a keyed stream watermarked_stream.key_by(lambda value: value[1])\ .process(CountWithTimeoutFunction()) \ .print() env.execute()
在上述示例中,咱们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每个 key 出现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其出现次数,发送到下游节点。
除了 event time timer 以外,用户还可使用 processing time timer。
timer 的工做流程是这样的:
须要注意的是:因为 timer 注册消息以及触发消息经过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会形成在某些场景下,timer 的触发可能没有那么及时。好比当用户注册了一个 processing time timer,当 timer 触发以后,触发消息经过数据通道传输到 Python UDF 时,可能已是几秒中以后了。
在这篇文章中,咱们主要介绍了如何在 Python DataStream API 做业中使用 state & timer,state & timer 的工做原理以及如何进行性能调优。接下来,咱们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深刻了解 PyFlink 中各类功能、应用场景以及最佳实践等。
另外,阿里云实时计算生态团队长期招聘优秀大数据人才(包括实习 + 社招),咱们的工做包括: