摘要: 昨天,DataWorks推出了PYODPS任务类型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS节点上直接编辑Python代码操做Maxcompute,也能够设置调度任务来处理数据,提升数据开发效率。html
昨天,DataWorks推出了PYODPS任务类型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS节点上直接编辑Python代码操做Maxcompute,也能够设置调度任务来处理数据,提升数据开发效率。sql
效果以下图app
只有华东2(上海)region 支持了 PYODPS 节点。异步
注:底层的 Python 版本为 2.7 。工具
新建 PYODPS 节点具体操做以下:测试
1) 单击数据开发页面工具栏中的 新建 > 新建任务。2) 填写新建任务弹出框中的各配置项。spa
3) 单击建立code
DataWorks 的 PyODPS 节点中,将会包含一个全局的变量 odps 或者 o ,即 ODPS 入口。用户不须要手动定义 ODPS 入口。htm
print(odps.exist_table('pyodps_iris'))
PyODPS支持ODPS SQL的查询,并能够读取执行的结果。 execute_sql 或者 run_sql 方法的返回值是 运行实例 。对象
注解:并不是全部在 ODPS Console 中能够执行的命令都是 ODPS 能够接受的 SQL 语句。 在调用非 DDL / DML 语句时,请使用其余方法,例如 GRANT / REVOKE 等语句请使用 run_security_query 方法,PAI 命令请使用 run_xflow 或 execute_xflow 方法。
>>> o.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成 >>> >>> instance = o.run_sql('select * from dual') # 异步的方式执行 >>> print(instance.get_logview_address()) # 获取logview地址 >>> instance.wait_for_success() # 阻塞直到完成
有时,咱们在运行时,须要设置运行时参数,咱们能够经过设置 hints 参数,参数类型是dict。
>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
咱们能够对于全局配置设置sql.settings后,每次运行时则都会添加相关的运行时参数。
>>> from odps import options >>> options.sql.settings = {'odps.sql.mapper.split.size': 16} >>> o.execute_sql('select * from pyodps_iris') # 会根据全局配置添加hints
运行 SQL 的 instance 可以直接执行 open_reader 的操做,一种状况是SQL返回告终构化的数据。
>>> with o.execute_sql('select * from dual').open_reader() as reader: >>> for record in reader: >>> # 处理每个record
另外一种状况是 SQL 可能执行的好比 desc,这时经过 reader.raw 属性取到原始的SQL执行结果。
>>> with o.execute_sql('desc dual').open_reader() as reader: >>> print(reader.raw)
PYODPS节点使用调度参数须要注意一下,系统定义的调度参数,能够直接经过此方法获取。
在全局包括一个 args 对象,能够在这个中获取,它是一个dict类型。
测试运行结果以下:
请注意:在数据开发下,使用了自定义调度参数,页面上直接触发运行PYODPS节点时,须要写死时间,PYODPS节点没法像SQL同样直接替换。
调度请参考:https://help.aliyun.com/document_detail/30298.html
本文为云栖社区原创内容,未经容许不得转载