Pilosa文档翻译(三)示例

[TOC]html

原文地址:https://www.pilosa.com/docs/latest/examples/ 位图数据库:Pilosa查询十亿级出租车搭乘数据案例python

简单说明 Introduction

纽约市发布了一个很是详细的包含了超过10亿条出租车搭乘数据的集合。该数据已经成为科技博客分析的热门目标,而且已经获得了很好的研究。出于这个缘由,咱们认为将这些数据导入Pilosa,以便肯定同一数据集状况下与其余数据存储和技术进行比较。linux

通常来讲,传输(Transportation)是Pilosa的值得关注的用例,由于它一般涉及多个不一样的数据源,以及高速率,实时和极大量的数据(特别是若是想得出合理的结论)。git

咱们编写了一个工具来帮助将NYC(纽约市)出租车数据导入Pilosa这个工具是PDK(Pilosa开发工具包)的一部分,并利用了许多可重复使用的模块,这些模块也能够帮助您导入其余数据。 接下来,咱们将逐步解释整个过程。github

初始设置以后,PDK导入工具会执行咱们定义Pilosa架构所需的一切,相应地将数据映射到位图,而后将其导入Pilosa数据库

数据模型 Data Model

纽约出租车数据由如下列出的许多csv文件组成:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 这些数据文件大约有20列,其中大约一半与咱们正在研究的基准查询相关:json

  • 距离(Distance): miles(英里), floating point(浮点值)
  • 车费(Fare): dollars(美圆), floating point(浮点值)
  • 乘客人数(Number of passengers): integer(整数值)
  • 下车位置(Dropoff location): latitude and longitude(经纬度), floating point(浮点值)
  • 上车位置(Pickup location): latitude and longitude(经纬度), floating point(浮点值)
  • 下车时间(Dropoff time): timestamp(时间戳)
  • 上车时间(Pickup time): timestamp(时间戳)

注意:下面表格中的row ID是指记录的取值范围,不要理解成MySQL等数据库的rowID。 咱们导入这些字段,从每一个字段建立一个或多个Pilosa字段:架构

字段(Field) 映射(Mapping)
cab_type(出租车类型) 直接映射整数枚举值 → row ID
dist_miles(距离) 四舍五入round(dist) → row ID
total_amount_dollars(总金额) 四舍五入round(dist) → row ID
passenger_count(乘车人数) 直接映射整数值 → row ID
drop_grid_id(下车位置网格ID) (lat, lon) → 100x100矩形分割网格 → cell(格子) ID
drop_year 年份year(timestamp) → row ID
drop_month 月份month(timestamp) → row ID
drop_day 该月第几天day(timestamp) → row ID
drop_time(下车时间) 该天中的时间映射到48个半小时组成的桶中
pickup_grid_id(下车位置网格ID) (lat, lon) → 100x100矩形分割网格 → cell ID
pickup_year year(timestamp) → row ID
pickup_month month(timestamp) → row ID
pickup_day day(timestamp) → row ID
pickup_time(上车时间) time of day mapped to one of 48 half-hour buckets → row ID

咱们还建立了两个附加字段表示持续时间和每一次乘坐的平均速度:app

字段(Field) 映射(Mapping)
duration_minutes(持续时间) round(drop_timestamp - pickup_timestamp) → row ID
speed_mph() round(dist_miles ÷ (drop_timestamp - pickup_timestamp)) → row ID

映射Mapping

咱们要使用的每一个列(column)都必须根据某些规则映射到字段(fields)row ID的组合。 有不少方法能够实现这个映射,出租车数据集为咱们提供了一个可能性的很好的描述。函数

0列(colums) --> 1字段(field)

cab_type: 每一行表示一个出租车的类型,在一行数据中有一些bit位来表示一次乘车中使用的出租车类型。 这是一个简单的枚举映射,例如黄色yellow=0,绿色green=1等。这个字段值的位宽(bits)由数据源肯定。也就是说,咱们有几个数据来源(NYC出租车-黄色, NYC出租车-绿色,Uber汽车),对于每个数据来源,要设置不一样的cab_type常量值。

1列(colums) --> 1字段(field)

如下三个字段以简单的直接方式从原始数据的单个列进行映射。 dist_miles:每一行值(row ID)表示乘车的距离,这个映射关系很简单:例如行值1(row 1)表示乘车距离在[0.5,1.5]这个区间内。也就是说,咱们将距离这个浮点值舍入为整数并将其直接用做row ID。一般,从浮点值到row ID的映射能够是任意的。 舍入映射实现简洁,简化了导入和分析。而且,它是人类可读的。 咱们会看到这种模式屡次使用。

PDK使用中,咱们定义了一个Mapper,它是一个只返回整数row ID的函数。 PDK具备许多预约义的映射器,可使用一些参数进行描述。 其中之一是LinearFloatMapper。在下面代码中它将线性函数应用于输入,并将其转换为整数,所以隐式处理舍入。

lfm := pdk.LinearFloatMapper{
    Min: -0.5,
    Max: 3600.5,
    Res: 3601,
}

MinMax定义线性函数,Res肯定输出row ID的最大容许值。咱们选择这些值以产生一个舍入到最接近的整数的行为。其余预约义的映射器也有本身的特定参数,一般是两个或三个。

这个映射函数(mapping function)是核心操做,但咱们须要一些其余部分来定义整个过程,它封装在BitMapper对象中。此对象定义要使用的输入数据源的哪些字段(Field),如何解析它们(Parsers),要使用的映射(Mapper)以及要使用的字段的名称(Frame)。TODO更新,这是合理的。

pdk.BitMapper{
    Frame:   "dist_miles",
    Mapper:  lfm,
    Parsers: []pdk.Parser{pdk.FloatParser{}},
    Fields:  []int{fields["trip_distance"]},
},

这些相同的对象在JSON定义文件中表示:

{
    "Fields": {
        "Trip_distance": 10
    },
    "Mappers": [
        {
            "Name": "lfm0",
            "Min": -0.5,
            "Max": 3600.5,
            "Res": 3600
        }
    ],
    "BitMappers": [
        {
            "Frame": "dist_miles",
            "Mapper": {
                "Name": "lfm0"
            },
            "Parsers": [
                {"Name": "FloatParser"}
            ],
            "Fields": "Trip_distance"
        }
    ]
}

在这里,咱们定义一个Mappers列表,每一个Mappers都包含一个名称,咱们将在稍后的BitMappers列表中用它来引用mapper。咱们也可使用Parsers进行此操做,但默认状况下可使用一些不须要配置的简单解析器。 咱们还有一个Fields列表,它只是一个字段名称(在源数据中)到列索引(在Pilosa中)的映射。 咱们使用的BitMapper定义这些名字让人可读。

total_amount_dollars:在这里,咱们再次使用舍入映射,所以每行表明一次行程的总成本,该成本将舍入映射为row ID。 BitMapper定义与前一个定义很是类似。

passenger_count:此列包含小整数,所以咱们使用最简单的映射之一:列值是就是row ID。

1列(colums) --> 多字段(multiple field)

使用复合数据类型(如时间戳timestamp)时,有不少映射选项。 在这种状况下,咱们但愿看到有趣的周期性趋势。所以,咱们但愿以一种容许咱们在分析过程当中独立查看它们的方式对时间的循环份量进行编码。

咱们经过将时间数据存储在每一个时间戳的四个单独字段中来实现此目的:对于年year``月month``日day时间time各一个。 前三个是直接映射的。例如,乘坐的日期2015/06/24将在字段year的第2015行,字段month的第6行和字段day的第24行中设置。

咱们可能会在几小时、几分钟和几秒钟内继续使用这种模式,可是咱们在这里没有太多使用这种精度,因此咱们使用bucketing方法。 也就是说,咱们选择一个分辨率(30分钟),将日期划分为该大小的桶,并为每一个桶建立一行。 所以,6:45 AM时间在time_of_day字段的第13行中设置了bit位(位图中)。

咱们针对每一个感兴趣的时间戳执行全部这些操做,一个用于上车时间,一个用于下车时间。 这为咱们提供了两个时间戳的总字段:pickup_yearpickup_monthpickup_daypickup_timedrop_yeardrop_monthdrop_daydrop_time

多列(multiple colums) --> 1字段(field)

乘坐数据还包含地理定位数据:上车和下车的纬度和经度。 咱们只是但愿可以生成乘坐位置的粗略概述热图,所以咱们使用网格映射。 咱们将感兴趣的区域划分为经纬度空间中的100x100矩形网格,使用单个整数标记此网格中的每一个单元格,并使用该整数做为row ID。

咱们为每一个感兴趣的位置作了全部这些,一个用于上车,一个用于下车。 这为两个位置提供了两个字段:pickup_grid_iddrop_grid_id

一样,位置数据有许多映射选项。 例如,咱们可能会转换为不一样的坐标系,应用投影或将位置聚合到实际区域(如邻域neighborhoods)。 这里,简单的方法就足够了。

复合映射(Complex mappings)

咱们还指望寻找行程持续时间和速度的趋势,所以咱们但愿在导入过程当中捕获此信息。 对于字段duration_minutes咱们使用round((drop_timestamp - pickup_timestamp).minutes)计算row ID。 对于字段speed_mph咱们使用round(dist_miles / (drop_timestamp - pickup_timestamp).minutes)计算row ID`。 这些映射计算很简单,但因为它们须要对多列进行算术运算,所以在PDK中提供的基本映射器中捕获它们有点过于复杂。 相反,咱们定义自定义映射器来完成工做:

durm := pdk.CustomMapper{
    Func: func(fields ...interface{}) interface{} {
        start := fields[0].(time.Time)
        end := fields[1].(time.Time)
        return end.Sub(start).Minutes()
    },
    Mapper: lfm,
}

导入处理

设计这个架构和映射后,咱们可使用PDK导入工具读取的JSON定义文件捕获它。 运行pdk taxi会根据此文件中的信息运行导入。 有关更多详细信息,请参阅PDK部分,或查看代码自己。

查询

如今咱们能够运行一些示例查询。

能够在一个PQL调用对cab类型进行检索、排序、计数。

TopN(cab_type)
{"results":[[{"id":1,"count":1992943},{"id":0,"count":7057}]]}

可使用相似的调用检索高流量位置ID。 这些ID对应于经纬度,能够从生成ID的映射中反算。

TopN(pickup_grid_id)
{"results":[[{"id":5060,"count":40620},{"id":4861,"count":38145},{"id":4962,"count":35268},...]]}

每一个passenger_count(乘客计数)的total_amount(总金额)的平均值能够经过一些后处理来计算。咱们使用少许的TopN调用来检索passenger_count的行程计数, 而后使用这些计数来计算平均值。

queries = ''
pcounts = range(10)
for i in pcounts:
    queries += "TopN(Row(passenger_count=%d), total_amount_dollars)" % i
resp = requests.post(qurl, data=queries)

average_amounts = []
for pcount, topn in zip(pcounts, resp.json()['results']):
    wsum = sum([r['count'] * r['key'] for r in topn])
    count = sum([r['count'] for r in topn])
    average_amounts.append(float(wsum)/count)

注意,BSI-powered Sum查询如今提供了这种查询的替代方法。

有关更多示例和详细信息,请参阅此ipython notebook

相关文章
相关标签/搜索