PyODPS的使用
1:安装的前提条件
安装PyOPDS环境前,您的Python环境需要满足以下条件:
- setuptools 3.0或以上版本。
- requests 2.4.0或以上版本。
2:安装步骤
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。
pip install cython>=0.19.0 # 可选,不建议Windows用户安装。
# 执行如下命令安装PyODPS。
pip install pyodps
# 执行如下命令检查安装是否成功。
python -c "from odps import ODPS"
# 如果您使用的Python不是系统默认的Python版本,安装完PIP后,您可以执行如下命令进行Python版本切换。
/home/tops/bin/python2.7 -m pip install setuptools>=3.0
3:建议您安装以下工具,提升Tunnel上传的速度:
greenlet 0.4.10或以上版本。
cython 0.19.0或以上版本。
4:账号密码的设置
这个里面的
ACCESS_ID以及ACCESS_KEY是登陆dataworks的账号以及密码,一下图片指示中可以找到ACCESS_ID
5:代码运行pyodps的操作
from odps import ODPS
import sys
reload (sys)
#修改系统默认编码。数据中存在中文字符时需要执行此操作。
sys.setdefaultencoding('utf8')
o = ODPS(ACCESS_ID, ACCESS_KEY, DEFAULT_PROJECT, endpoint=END_POINT)
#以直接指定字段名以及字段类型的方式创建非分区表my_new_table。
table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
#向非分区表my_new_table中插入数据。
records = [[111, 'aaa'],
[222, 'bbb'],
[333, 'ccc'],
[444, '中文']]
o.write_table(table, records)
#读取非分区表my_new_table中的数据。
for record in o.read_table(table):
print record[0],record[1]
#以运行SQL的方式读取表中的数据。
result = o.execute_sql('select * from my_new_table;',hints={'odps.sql.allow.fullscan': 'true'})
#读取SQL执行结果。
with result.open_reader() as reader:
for record in reader:
print record[0],record[1]
#删除表以清除资源。
table.drop()
⚠️:pyodps的文档地址:https://help.aliyun.com/document_detail/90444.html?spm=a2c4g.11186623.2.12.4b47692dLw87uq
6:ODPS入口
DataWorks的PyODPS节点中,将会包含一个全局变量odps
或者o
,即为ODPS入口。您不需要手动定义ODPS入口。
print(o.exist_table('pyodps_iris'))
7:执行SQL
您可以在PyODPS节点中执行SQL,详情请参见执行SQL文档 。
Dataworks上默认未开启instance tunnel,即instance.open_reader默认使用Result接口(最多一万条记录)。您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要关闭limit限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # 关闭limit限制,读取全部数据。
with instance.open_reader() as reader:
# 通过instance tunnel可读取全部数据。
您也可以通过在open_reader上添加tunnel=True
,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False
,实现仅对本次关闭limit限制。
with instance.open_reader(tunnel=True, limit=False) as reader:
# 本次open_reader使用instance tunnel接口,且能读取全部数据。
说明 若您未开启instance tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK。
8:DataFrame
- 执行
在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法(如execute,head等) 。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 调用立即执行的方法,处理每条record。
如果您需要在print时调用立即执行,需要开启
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在开始处打开开关。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # print时会立即执行。
- 打印详细信息
通过设置
options.verbose
选项。在DataWorks上,默认已经处于打开状态,运行过程会打印Logview等详细过程。
9:获取调度参数
与DataWorks中的SQL节点不同,为了避免影响代码,PyODPS节点不会在代码中替换 ${param_name}这样的字符串,而是在执行代码前,在全局变量中增加一个名为args
的dict,调度参数可以在此获取。例如,在节点基本属性 > 参数中设置ds=${yyyymmdd}
,则可以通过以下方式在代码中获取该参数。
print('ds=' + args['ds'])
ds=20161116
说明 如果您需要获取名为ds=${yyyymmdd}
的分区,则可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])
10:使用限制
- PyODPS节点底层的Python版本为2.7。
- PyODPS节点获取本地处理的数据不能超过50MB,节点运行时占用内存不能超过1G,否则节点任务会被系统中止。请避免在PyODPS任务中写额外的Python数据处理代码。
- 在DataWorks上编写代码并进行调试效率较低,为提升运行效率,建议本地安装IDE进行代码开发。
- 在DataWorks上使用PyODPS时,为了防止对DataWorks的Gate Way造成压力,对内存和CPU都有限制,该限制由DataWorks统一管理。如果您发现有Got killed报错,即表名内存使用超限,进程被中止。因此,请尽量避免本地的数据操作。通过PyODPS发起的SQL和DataFrame任务(除to_pandas外)不受此限制。
- 由于缺少matplotlib等包,如下功能可能受限:
- DataFrame的plot函数。
- DataFrame自定义函数需要提交到MaxCompute执行。由于Python沙箱限制,第三方库只支持所有的纯粹Python库以及Numpy,因此不能直接使用Pandas。
- DataWorks中执行的非自定义函数代码可以使用平台预装的Numpy和Pandas。其他带有二进制代码的三方包不被支持。
- 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默认设置为False。如果需要全局开启instance tunnel,需要手动将该值设置为True。
- 由于实现的原因,Python的atexit包不被支持,请使用try-finally结构实现相关功能。
上一篇: JVM学习笔记之三