PyODPS DataFrame 处理笛卡尔积的几种方式

PyODPS 提供了 DataFrame API 来用相似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操做。python

笛卡尔积最常出现的场景是两两之间须要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 存储目标点经纬度坐标,共有 M 行数据,小表 Coordinates2 存储出发点经纬度坐标,共有 N 行数据,如今须要计算全部离目标点最近的出发点坐标。对于一个目标点来讲,咱们须要计算全部的出发点到目标点的距离,而后找到最小距离,因此整个中间过程须要产生 M * N 条数据,也就是一个笛卡尔积问题。git

haversine 公式
首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可使用 haversine 公式,使用 Python 的表达以下:app

def haversine(lat1, lon1, lat2, lon2):函数

#  lat1,  lon1  为位置  1  的经纬度坐标
    #  lat2,  lon2  为位置  2  的经纬度坐标
    import  numpy  as  np

    dlon  =  np.radians(lon2  -  lon1)
    dlat  =  np.radians(lat2  -  lat1)
    a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2
    c  =  2  *  np.arcsin(np.sqrt(a))
    r  =  6371  #  地球平均半径,单位为千米
    return  c  *  r

MapJoin
目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只须要两个 dataframe join 时指定 mapjoin=True,执行时会对右表作 mapjoin 操做。性能

In [3]: df1 = o.get_table('coordinates1').to_df() code

In [4]: df2 = o.get_table('coordinates2').to_df() 对象

In [5]: df3 = df1.join(df2, mapjoin=True) 接口

In [6]: df1.schema
Out[6]:
odps.Schema {ip

latitude                    float64              
longitude                  float64              
id                                string

}资源

In [7]: df2.schema
Out[7]:
odps.Schema {

latitude                    float64              
longitude                  float64              
id                                string

}

In [8]: df3.schema
Out[8]:
odps.Schema {

latitude_x                        float64              
longitude_x                      float64              
id_x                                    string                
latitude_y                        float64              
longitude_y                      float64              
id_y                                    string

}
能够看到在执行 join 时默认会将重名列加上 _x 和 _y 后缀,可经过在 suffixes 参数中传入一个二元 tuple 来自定义后缀,当有了 join 以后的表后,经过 PyODPS 中 DataFrame 的自建函数就能够计算出距离,十分简洁明了,而且效率很高。

In [9]: r = 6371

...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()  
  ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()  
  ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2  
  ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r

In [12]: df3.head(10)
Out[12]:

latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis

0 76.252432 59.628253 0 84.045210 6.517522 0 1246.864981
1 76.252432 59.628253 0 59.061796 0.794939 1 2925.953147
2 76.252432 59.628253 0 42.368304 30.119837 2 4020.604942
3 76.252432 59.628253 0 81.290936 51.682749 3 584.779748
4 76.252432 59.628253 0 34.665222 147.167070 4 6213.944942
5 76.252432 59.628253 0 58.058854 165.471565 5 4205.219179
6 76.252432 59.628253 0 79.150677 58.661890 6 323.070785
7 76.252432 59.628253 0 72.622352 123.195778 7 1839.380760
8 76.252432 59.628253 0 80.063614 138.845193 8 1703.782421
9 76.252432 59.628253 0 36.231584 90.774527 9 4717.284949

In [13]: df1.count()
Out[13]: 2000

In [14]: df2.count()
Out[14]: 100

In [15]: df3.count()
Out[15]: 200000
df3 已是有 M * N 条数据了,接下来若是须要知道最小距离,直接对 df3 调用 groupby 接上 min 聚合函数就能够获得每一个目标点的最小距离。

In [16]: df3.groupby('id_x').dis.min().head(10)
Out[16]:

dis_min

0 323.070785
1 64.755493
2 1249.283169
3 309.818288
4 1790.484748
5 385.107739
6 498.816157
7 615.987467
8 437.765432
9 272.589621
DataFrame 自定义函数
若是咱们须要知道对应最小距离的点的城市,也就是表中对应的 id ,能够在 mapjoin 以后调用 MapReduce,不过咱们还有另外一种方式是使用 DataFrame 的 apply 方法。要对一行数据使用自定义函数,可使用 apply 方法,axis 参数必须为 1,表示在行上操做。

表资源
要注意 apply 是在服务端执行的 UDF,因此不能在函数内使用相似于df=o.get_table('table_name').to_df() 的表达式去得到表数据,具体原理能够参考PyODPS DataFrame 的代码在哪里跑。以本文中的状况为例,要想将表 1 与表 2 中全部的记录计算,那么须要将表 2 做为一个资源表,而后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只须要将一个 collection 传入 resources 参数便可。collection 是个可迭代对象,不是一个 DataFrame 对象,不能够直接调用 DataFrame 的接口,每一个迭代值是一个 namedtuple,能够经过字段名或者偏移来取对应的值。

use dataframe udf

df1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()

def func(collections):

import pandas as pd

collection = collections[0]

ids = []
latitudes = []
longitudes = []
for r in collection:
    ids.append(r.id)
    latitudes.append(r.latitude)
    longitudes.append(r.longitude)

df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes})
def h(x):        
    df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)
    return df.iloc[df['dis'].idxmin()]['id']
return h

df1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute(

libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在自定义函数中,将表资源经过循环读成 pandas DataFrame,利用 pandas 的 loc 能够很方便的找到最小值对应的行,从而获得距离最近的出发点 id。另外,若是在自定义函数中须要使用到三方包(例如本例中的 pandas)能够参考这篇文章。

全局变量
当小表的数据量十分小的时候,咱们甚至能够将小表数据做为全局变量在自定义函数中使用。

df1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()
df = df2.to_pandas()

def func(x):

df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)
return df.iloc[df['dis'].idxmin()]['id']

df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(

libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在上传函数的时候,会将函数内使用到的全局变量(上面代码中的 df) pickle 到 UDF 中。可是注意这种方式使用场景很局限,由于 ODPS 的上传的文件资源大小是有限制的,因此数据量太大会致使 UDF 生成的资源太大从而没法上传,并且这种方式最好保证三方包的客户端与服务端的版本一致,不然颇有可能出现序列化的问题,因此建议只在数据量很是小的时候使用。

总结使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,性能好,通常能用 mapjoin 解决的咱们都推荐使用 mapjoin,而且最好使用内建函数计算,能到达最高的效率,可是它不够灵活。另外一种是使用 DataFrame 自定义函数,比较灵活,性能相对差一点(可使用 pandas 或者 numpy 得到性能上的提高),经过使用表资源,将小表做为表资源传入 DataFrame 自定义函数中,从而完成笛卡尔积的操做。

相关文章
相关标签/搜索