简介:本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展现了相关demo。
做者|付典python
本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展现了相关demo。本文主要分为4个部分:git
- PyFlink介绍
- PyFlink相关功能
- PyFlink功能演示
- PyFlink下一步规划
PyFlink是Flink的一个子模块,也是整个Flink项目的一部分,主要目的是提供Flink的Python语言支持。由于在机器学习和数据分析等领域,Python语言很是重要,甚至是最主要的开发语言。因此,为了知足更多用户需求,拓宽Flink的生态,咱们启动了PyFlink项目。github
PyFlink项目的目标主要有两点,第一点是将Flink的计算能力输出给Python用户,也就是咱们会在Flink中提供一系列的Python API,方便对Python语言比较熟悉的用户开发Flink做业。docker
第二点,就是将Python生态基于Flink进行分布式化。虽然咱们会在Flink中提供一系列的Python API来给Python用户来使用,但这对用户来讲是有学习成本的,由于用户要学习怎么使用Flink的Python API,了解每个API的用途。因此咱们但愿用户能在API层使用他们比较熟悉的 Python库的API,可是底层的计算引擎使用Flink,从而下降他们的学习成本。这是咱们将来要作的事情,目前处于启动阶段。apache
下图是PyFlink项目的发展状况,目前发布了3个版本,支持的内容也愈来愈丰富。api
咱们主要介绍PyFlink如下功能,Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化。数据结构
Python Table API的目的是为了让用户可使用Python语言来开发Flink做业。Flink里面有三种类型的API,Process、Function和Table API,前二者是较为底层的API,基于Process和Function开发的做业,其逻辑会严格按照用户定义的行为进行执行,而Table API是较为高层的API,基于Table API开发的做业,其逻辑会通过一系列的优化以后进行执行。架构
Python Table API,顾名思义就是提供 Table API的Python语言支持。并发
如下是Python Table API开发的一个Flink做业,做业逻辑是读取文件,计算word count,而后再把计算结果写到文件中去。这个例子虽然简单,但包括了开发一个Python Table API做业的全部基本流程。机器学习
首先咱们须要定义做业的执行模式,好比说是批模式仍是流模式,做业的并发度是多少?做业的配置是什么。接下来咱们须要定义source表和sink表,source表定义了做业的数据源来源于哪里,数据的格式是什么;sink表定义了做业的执行结果写到哪里去,数据格式是什么。最后咱们须要定义做业的执行逻辑,在这个例子中是计算写过来的count。
如下是Python Table API的部分截图,能够看到它的数量和功能都比较齐全。
Python Table API是一种关系型的API,其功能能够类比成SQL,而SQL里自定义函数是很是重要的功能,能够极大地扩展SQL的使用范围。Python UDF的主要目的就是容许用户使用Python语言来开发自定义函数,从而扩展Python Table API的使用场景。同时,Python UDF除了能够用在Python Table API做业中以外,还能够用在Java Table API做业以及SQL做业中。
在PyFlink中咱们支持多种方式来定义Python UDF。用户能够定义一个Python类,继承ScalarFunction,也能够定义一个普通的Python函数或者Lambda函数,实现自定义函数的逻辑。除此以外,咱们还支持经过Callable Function和Partial Function定义Python UDF。用户能够根据本身的须要选择最适合本身的方式。
PyFlink里面提供了多种Python UDF的使用方式,包括Python Table API、Java table API和SQL,咱们一一介绍。
在Python Table API中使用Python UDF,在定义完Python UDF以后,用户首先须要注册Python UDF,能够调用table environment register来注册,而后命名,而后就能够在做业中经过这个名字来使用 Python UDF了。
在Java Table API中它的使用方式也比较类似,可是注册方式不同,Java Table API做业中须要经过DDL语句来进行注册。
除此以外,用户也能够在SQL的做业中使用Python UDF。与前面两种方式相似,用户首先须要注册Python UDF,能够在SQL脚本中经过DDL语句来注册,也能够在SQL Client的环境配置文件里面注册。
简单介绍下Python UDF的执行架构。Flink是用Java语言编写的,运行在Java虚拟机中,而Python UDF运行在 Python虚拟机中,因此Java进程和Python进程须要进行数据通讯。 除此以外,二者间还须要传输state、log、metrics,它们的传输协议须要支持4种类型。
向量化Python UDF的主要目的是使 Python用户能够利用Pandas或者Numpy等数据分析领域经常使用的Python库,开发高性能的Python UDF。
向量化Python UDF是相对于普通Python UDF而言的,咱们能够在下图看到二者的区别。
下图显示了向量化Python UDF的执行过程。首先在Java端,Java在攒完多条数据以后会转换成Arrow格式,而后发送给Python进程。Python进程在收到数据以后,将其转换成Pandas的数据结构,而后调用用户自定义的向量化Python UDF。同时向量化Python UDF的执行结果会再转化成Arrow格式的数据,再发送给 Java进程。
在使用方式上,向量化Python UDF与普通Python UDF是相似的,只有如下几个地方稍有不一样。首先向量化Python UDF的声明方式须要加一个UDF type,声明这是一个向量化Python UDF,同时UDF的输入输出类型是Pandas Series。
前面咱们提到 Python UDF有多种定义方式,可是若是须要在Python UDF中使用Metrics,那么Python UDF必须继承ScalarFunction来进行定义。在Python UDF的 open方法里面提供了一个Function Context参数,用户能够经过Function Context参数来注册Metrics,而后就能够经过注册的 Metrics对象来汇报了。
从类型来讲,PyFlink依赖主要包括如下几种类型,普通的PyFlink文件、存档文件,第三方的库、PyFlink解释器,或者Java的Jar包等等。从解决方案来看,针对每种类型的依赖,PyFlink提供了两种解决方案,一种是API的解决方案,一种是命令行选项的方式,你们选择其一便可。
Python UDF的执行优化主要包括两个方面,执行计划优化和运行时优化。它与SQL很是像,一个包含Python UDF的做业,首先会通过预先定义的规则,生成一个最优的执行计划。在执行计划已经肯定的状况下,在实际执行的时候,又能够运用一些其余的优化手段来达到尽量高的执行效率。
执行计划的优化主要有如下几个优化思路。一个是不一样类型的 UDF的拆分,因为在一个节点中可能同时包含多种类型的UDF,而不一样的类型的UDF是不能放在一块执行的;第二个方面是Filter下推,其主要目的是尽量下降含有Python UDF节点的输入数据量,从而提高整个做业的执行性能;第三个优化思路是Python UDF Chaining,Java进程与Python进程之间的通讯开销以及序列化反序列化开销比较大,而Python UDF Chaining能够尽可能减小Java进程和Python进程之间的通讯开销。
假若有这样一个做业,它包含了两个UDF,其中add是Python UDF, subtract是向量化Python UDF。默认状况下,这个做业的执行计划会有一个project节点,这两个 UDF同时位于这一project的节点里面。这个执行计划的主要问题是,普通Python UDF每次处理一条数据,而向量化Python UDF,每次处理多条数据,因此这样的一个执行计划是没有办法执行的。
可是经过拆分,咱们能够把这一个project的节点拆分红了两个project的节点,其中第一个project的节点只包含普通Python UDF,而第二个节点只包含向量化Python UDF。不一样类型的Python UDF拆分到不一样的节点以后,每个节点都只包含了一种类型的UDF,因此算子就能够根据它所包含的UDF的类型选择最合适的执行方式。
Filter下推的主要目的是将过滤算子下推到Python UDF节点以前,尽可能减小Python UDF节点的数据量。
假如咱们有这样一个做业,做业原始执行计划里面包括了两个Project的节点,一个是add、 subtract,同时还包括一个Filter节点。这个执行计划是能够运行的,但须要更优化。能够看到,由于Python的节点位于Filter节点以前,因此在Filter节点以前Python UDF已经计算完了,可是若是把Filter过滤下,推到Python UDF以前,那么就能够大大下降Python UDF节点的输入数据量。
假如咱们有这样一个做业,里面包含两种类型的UDF,一个是add,一个是subtract,它们都是普通的Python UDF。在一个执行计划里面包含两个project的节点,其中第一个project的节点先算subtract,而后再传输给第二个project节点进行执行。
它的主要问题是,因为subtract和add位于两个不一样的节点,其计算结果须要从Python发送回Java,而后再由Java进程发送给第二个节点的Python进行执行。至关于数据在Java进程和Python进程之间转了一圈,因此它带来了彻底没有必要的通讯开销和序列化反序列化开销。所以,咱们能够将执行计划优化成右图,就是将add节点和subtract节点放在一个节点中运行,subtract节点的结果计算出来以后直接去调用add节点。
目前提升Python UDF运营时的执行效率有三种:一是Cython优化,用它来提升Python代码的执行效率;二是自定义Java进程和Python进程之间的序列化器和反序列化器,提升序列化和反序列化效率;三是提供向量化Python UDF功能。
首先你们打开这个页面,里面提供了PyFlink的一些demo,这些demo是运行在docker里面的,因此你们若是要运行这些demo就须要在本机安装docker环境。
随后,咱们能够运行命令,命令会启动一个PyFlink的集群,后面咱们运行的PyFlink的例子都会提交到集群去执行。
第一个例子是word count,咱们首先在里面定义了环境、source、sink等,咱们能够运行一下这个做业。
这是做业的执行结果,能够看到Flink这个单词出现了两次,PyFlink这个单词出现了一次。
接下来再运行一个Python UDF的例子。这个例子和前面有一些相似,首先咱们定义它使用PyFlink,运行在批这种模式下,同时做业的并发度是1。不同的地方是咱们在做业里定义了一个UDF,它的输入包括两个列,都是Bigint类型,并且它输出类型也是对应的。这个UDF的逻辑是把这两个列的相加做为一个结果输出。
咱们执行一下做业,执行结果是3。
接下来咱们再运行一个带有依赖的Python UDF。前面做业的UDF是不包含任何依赖的,直接就把两个输入列相加起来。而在这个例子里,UDF引用了一个第三方的依赖,咱们能够经过API set python requirement来执行。
接下来咱们运行做业,它的执行结果和前面是同样的,由于这两个做业的逻辑是相似的。
接下来咱们再看一个向量化Python UDF的例子。在 UDF定义的时候,咱们加了一个UDF的type字段,说明说咱们是一个向量化的Python UDF,其余的逻辑和普通Python UDF的逻辑相似。最后它的执行结果也是3,由于它的逻辑和前面是同样的,计算两页的之和。
咱们再来看一个例子,在Java的Table做业里面使用Python。在这个做业里面咱们又会用到一个Python UDF,它经过DDL语句进行注册,而后在execute SQL语句里面进行使用。
接下来咱们再看在纯SQL做业中使用Python UDF的例子。在资源文件里面咱们声明了一个UDF,名字叫add1,它的类型是Python,同时咱们也能看到它的UDF位置。
接下来咱们运行它,执行结果是234。
目前PyFlink只支持了Python Table API,咱们计划在下一个版本中支持DataStream API,同时也会支持Python UDAF以及Pandas UDAF,另外,在执行层也会持续优化PyFlink的执行效率。
这是一些资源的连接,包括PyFlink的文档地址。
https://ci.apache.org/projects/flink/flink-docs-master/api/python/
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/
https://github.com/pyflink/playgrounds/tree/1.11
好的,咱们今天的分享就到这里了,欢迎你们继续关注咱们的课程。
活动推荐:
仅需99元便可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方连接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506
本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。