在使用 PyODPS DataFrame 编写数据应用时,尽管编写的是同一个脚本文件,但其中的代码会在不一样位置执行,这可能致使一些没法预期的问题,本文介绍当出现相关问题时,如何肯定代码在何处执行,以及提供部分场景下解决问题的方法。html
概述
假定咱们要执行下面的代码:后端
from odps import ODPS, options
import numpy as np服务器
o = ODPS(access_id, access_key, project, endpoint)
df = o.get_table('pyodps_iris').to_df()网络
coeffs = [0.1, 0.2, 0.4]闭包
def handle(v):app
import numpy as np return float(np.cosh(v)) * sum(coeffs)
options.df.supersede_libraries = True
val = df.sepal_length.map(handle).sum().execute(libraries=['numpy.zip', 'other.zip'])
print(np.sinh(val))
在开始分析以前,首先须要指出的是,PyODPS 是一个 Python 包而非 Python Implementation,PyODPS 的运行环境均为未经修改的 Python,于是并不会出现与正常 Python 解释器不一致的行为。亦即,你所写的每一条语句不会有与标准 Python 语句不一样的行为,例如自动变成分布式代码,等等。分布式
下面解释该代码的执行过程。函数
上图是执行上述代码时可能涉及的系统。代码自己执行的位置在图中用紫色表示,这些系统都位于 MaxCompute 外部,为了方便表述,下文称为“本地”。在本地执行的代码包括 handle 函数以外的部分(注意 handle 传入 map 时仅传入了函数自己而并未执行)。于是,这些代码在执行时,行为与普通 Python code 的执行行为相似,import 第三方包时,引用的是本地的包。于是,上面的代码中,libraries=['numpy.zip', 'other.zip']引用的other.zip由于并无在本地安装,于是若是代码中有诸如 import other 这样的语句,会致使执行报错。即使 other.zip 已被上传到 MaxCompute 资源也是如此,由于本地根本没有这个包。理论上,本地代码若是不涉及 PyODPS 包,则与 PyODPS 无关,用户须要自行排查。spa
对于 handle 函数,状况发生了变化。handle 函数传入 map 方法时,若是使用的后端是 MaxCompute 后端,会先被 cloudpickle 模块提取闭包和字节码,此后 PyODPS DataFrame 会使用闭包和字节码生成一个 Python UDF,提交到 MaxCompute。最后,做业以 SQL 的形式在 MaxCompute 执行时,会调用这个 Python UDF,其中的字节码和闭包内容会被 unpickle,此后在 MaxCompute Executor 执行。因而可知,在上述代码中,调试
在 handle 函数体中的代码都不会在本地执行,而会在 MaxCompute Executor 中执行;
handle 函数体中没法引用本地安装的包,只有在 MaxCompute Executor 中存在的包才有效;
上传的第三方包必须可以在 MaxCompute Executor 中的 Python 版本(目前为 Python 2.7,UCS2)中调用;
handle 函数体中修改引用的外部变量(上述代码中的 coeffs)不会致使本地的 coeffs 值被修改;
若是在 handle 中引用在 handle 外 import 的包,在 handle 中调用可能会报错,由于在不一样环境中,包的结构可能不一样,而 cloudpickle 会将本地包的引用带到 MaxCompute Executor,致使报错,于是建议 import 在 handle 中进行;
因为使用 cloudpickle,若是在 handle 中调用了其余文件中的代码,该文件所在的包必须存在于 MaxCompute Executor 中。若是你不想使用第三方包的形式解决该问题,请将全部引用的我的代码放在同一个文件中。
上述对 handle 函数的解释对于自定义聚合、apply 和 map_reduce 中调用的自定义方法 / Agg 类均适用。若是使用的后端是 Pandas 后端,则全部代码都会在本地运行,于是本地也须要安装相关的包。但鉴于 Pandas 后端调试完毕后一般会转移到 MaxCompute 运行,建议在本地装包的同时,参照 MaxCompute 后端的惯例进行开发。
使用第三方包
我的电脑 / 自有服务器在本地使用第三方包 / 其余文件中的代码
在相应的 Python 版本上安装便可。
DataWorks 中本地使用其余文件中的代码
该部分功能由 DataWorks 提供,请参考 DataWorks 文档。
map / apply / map_reduce / 自定义聚合中使用第三方包 / 其余文件中的代码
参考 https://yq.aliyun.com/article... 。须要补充的是,在 DataWorks 上上传资源后,须要点击“提交”确保资源被正确上传到 MaxCompute。若是须要使用本身的 Numpy 版本,在上传正确版本的 wheel 包的同时,须要配置 odps.df.supersede_libraries = True,同时确保你上传的 numpy 包名位于 libraries 的最前面,若是指定了 options.df.libraries,则 numpy 包名须要位于 options.df.libraries 的最前面。
引用其余 MaxCompute 表中的数据
我的电脑 / 自有服务器在本地访问 MaxCompute 表
若是 Endpoint 能够链接,使用 PyODPS / DataFrame 访问。
map / apply / map_reduce / 自定义聚合中访问其余 MaxCompute 表
MaxCompute Executor 中一般不支持访问 Endpoint / Tunnel Endpoint,其上也没有 PyODPS 包可用,于是不能直接使用 ODPS 入口对象或者 PyODPS DataFrame,也不能从自定义函数外部传入这些对象。若是表的数据量不大,建议将 DataFrame 做为资源传入(见 https://pyodps.readthedocs.io... )。若是数据量较大,建议改写成 join。
访问其余服务
我的电脑 / 自有服务器在本地访问其余服务
保证本身的环境中能够正常访问相关服务,生产服务器能够联系 PE。
DataWorks 上的本地代码中访问其余服务
请咨询 DataWorks。
map / apply / map_reduce / 自定义聚合中访问其余服务
参考 https://yq.aliyun.com/article... 启用 Isolation,若是仍然遇到网络报错,请联系 MaxCompute 用户群,找 PE 帮忙解决。