[TOC]html
原文地址:https://www.pilosa.com/docs/latest/examples/ 位图数据库:Pilosa查询十亿级出租车搭乘数据案例python
简单说明 Introduction
纽约市发布了一个很是详细的包含了超过10亿条出租车搭乘数据的集合。该数据已经成为科技博客分析的热门目标,而且已经获得了很好的研究。出于这个缘由,咱们认为将这些数据导入Pilosa
,以便肯定同一数据集状况下与其余数据存储和技术进行比较。linux
通常来讲,传输(Transportation)是Pilosa的值得关注的用例,由于它一般涉及多个不一样的数据源,以及高速率,实时和极大量的数据(特别是若是想得出合理的结论)。git
咱们编写了一个工具来帮助将NYC(纽约市)出租车数据导入Pilosa
这个工具是PDK
(Pilosa开发工具包)的一部分,并利用了许多可重复使用的模块,这些模块也能够帮助您导入其余数据。 接下来,咱们将逐步解释整个过程。github
初始设置以后,PDK导入工具会执行咱们定义Pilosa架构
所需的一切,相应地将数据映射到位图
,而后将其导入Pilosa
。数据库
数据模型 Data Model
纽约出租车数据由如下列出的许多csv
文件组成:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 这些数据文件大约有20列,其中大约一半与咱们正在研究的基准查询相关:json
- 距离(Distance): miles(英里), floating point(浮点值)
- 车费(Fare): dollars(美圆), floating point(浮点值)
- 乘客人数(Number of passengers): integer(整数值)
- 下车位置(Dropoff location): latitude and longitude(经纬度), floating point(浮点值)
- 上车位置(Pickup location): latitude and longitude(经纬度), floating point(浮点值)
- 下车时间(Dropoff time): timestamp(时间戳)
- 上车时间(Pickup time): timestamp(时间戳)
注意:下面表格中的row ID是指记录的取值范围,不要理解成MySQL等数据库的rowID。 咱们导入这些字段,从每一个字段建立一个或多个Pilosa字段:架构
字段(Field) | 映射(Mapping) |
---|---|
cab_type(出租车类型) | 直接映射整数枚举值 → row ID |
dist_miles(距离) | 四舍五入round(dist) → row ID |
total_amount_dollars(总金额) | 四舍五入round(dist) → row ID |
passenger_count(乘车人数) | 直接映射整数值 → row ID |
drop_grid_id(下车位置网格ID) | (lat, lon) → 100x100矩形分割网格 → cell(格子) ID |
drop_year | 年份year(timestamp) → row ID |
drop_month | 月份month(timestamp) → row ID |
drop_day | 该月第几天day(timestamp) → row ID |
drop_time(下车时间) | 该天中的时间映射到48个半小时组成的桶中 |
pickup_grid_id(下车位置网格ID) | (lat, lon) → 100x100矩形分割网格 → cell ID |
pickup_year | year(timestamp) → row ID |
pickup_month | month(timestamp) → row ID |
pickup_day | day(timestamp) → row ID |
pickup_time(上车时间) | time of day mapped to one of 48 half-hour buckets → row ID |
咱们还建立了两个附加字段表示持续时间和每一次乘坐的平均速度:app
字段(Field) | 映射(Mapping) |
---|---|
duration_minutes(持续时间) | round(drop_timestamp - pickup_timestamp) → row ID |
speed_mph() | round(dist_miles ÷ (drop_timestamp - pickup_timestamp)) → row ID |
映射Mapping
咱们要使用的每一个列(column)
都必须根据某些规则映射到字段(fields)
和row ID
的组合。 有不少方法能够实现这个映射,出租车数据集为咱们提供了一个可能性的很好的描述。函数
0列(colums) --> 1字段(field)
cab_type
: 每一行表示一个出租车的类型,在一行数据中有一些bit位来表示一次乘车中使用的出租车类型。 这是一个简单的枚举映射,例如黄色yellow=0
,绿色green=1
等。这个字段值的位宽(bits)由数据源肯定。也就是说,咱们有几个数据来源(NYC出租车-黄色, NYC出租车-绿色,Uber汽车),对于每个数据来源,要设置不一样的cab_type
常量值。
1列(colums) --> 1字段(field)
如下三个字段以简单的直接方式从原始数据的单个列进行映射。 dist_miles
:每一行值(row ID)表示乘车的距离,这个映射关系很简单:例如行值1
(row 1)表示乘车距离在[0.5,1.5]
这个区间内。也就是说,咱们将距离这个浮点值舍入为整数并将其直接用做row ID
。一般,从浮点值到row ID
的映射能够是任意的。 舍入映射实现简洁,简化了导入和分析。而且,它是人类可读的。 咱们会看到这种模式屡次使用。
在PDK
使用中,咱们定义了一个Mapper
,它是一个只返回整数row ID
的函数。 PDK
具备许多预约义的映射器,可使用一些参数进行描述。 其中之一是LinearFloatMapper
。在下面代码中它将线性函数应用于输入,并将其转换为整数,所以隐式处理舍入。
lfm := pdk.LinearFloatMapper{ Min: -0.5, Max: 3600.5, Res: 3601, }
Min
和Max
定义线性函数,Res
肯定输出row ID
的最大容许值。咱们选择这些值以产生一个舍入到最接近的整数
的行为。其余预约义的映射器也有本身的特定参数,一般是两个或三个。
这个映射函数(mapping function)是核心操做,但咱们须要一些其余部分来定义整个过程,它封装在BitMapper对象中。此对象定义要使用的输入数据源的哪些字段(Field
),如何解析它们(Parsers
),要使用的映射(Mapper
)以及要使用的字段的名称(Frame
)。TODO更新,这是合理的。
pdk.BitMapper{ Frame: "dist_miles", Mapper: lfm, Parsers: []pdk.Parser{pdk.FloatParser{}}, Fields: []int{fields["trip_distance"]}, },
这些相同的对象在JSON定义文件中表示:
{ "Fields": { "Trip_distance": 10 }, "Mappers": [ { "Name": "lfm0", "Min": -0.5, "Max": 3600.5, "Res": 3600 } ], "BitMappers": [ { "Frame": "dist_miles", "Mapper": { "Name": "lfm0" }, "Parsers": [ {"Name": "FloatParser"} ], "Fields": "Trip_distance" } ] }
在这里,咱们定义一个Mappers
列表,每一个Mappers
都包含一个名称,咱们将在稍后的BitMappers
列表中用它来引用mapper。咱们也可使用Parsers
进行此操做,但默认状况下可使用一些不须要配置的简单解析器。 咱们还有一个Fields
列表,它只是一个字段名称(在源数据中)到列索引(在Pilosa中)的映射。 咱们使用的BitMapper定义这些名字让人可读。
total_amount_dollars
:在这里,咱们再次使用舍入映射,所以每行表明一次行程的总成本,该成本将舍入映射为row ID。 BitMapper定义与前一个定义很是类似。
passenger_count
:此列包含小整数,所以咱们使用最简单的映射之一:列值是就是row ID。
1列(colums) --> 多字段(multiple field)
使用复合数据类型(如时间戳timestamp)时,有不少映射选项。 在这种状况下,咱们但愿看到有趣的周期性趋势。所以,咱们但愿以一种容许咱们在分析过程当中独立查看它们的方式对时间的循环份量进行编码。
咱们经过将时间数据存储在每一个时间戳的四个单独字段中来实现此目的:对于年year``月month``日day
和时间time
各一个。 前三个是直接映射的。例如,乘坐的日期2015/06/24
将在字段year
的第2015行,字段month
的第6行和字段day
的第24行中设置。
咱们可能会在几小时、几分钟和几秒钟内继续使用这种模式,可是咱们在这里没有太多使用这种精度,因此咱们使用bucketing
方法。 也就是说,咱们选择一个分辨率(30分钟),将日期划分为该大小的桶,并为每一个桶建立一行。 所以,6:45 AM
时间在time_of_day
字段的第13行中设置了bit位(位图中)。
咱们针对每一个感兴趣的时间戳执行全部这些操做,一个用于上车时间,一个用于下车时间。 这为咱们提供了两个时间戳的总字段:pickup_year
,pickup_month
,pickup_day
,pickup_time
,drop_year
,drop_month
,drop_day
,drop_time
。
多列(multiple colums) --> 1字段(field)
乘坐数据还包含地理定位数据:上车和下车的纬度和经度
。 咱们只是但愿可以生成乘坐位置的粗略概述热图,所以咱们使用网格映射
。 咱们将感兴趣的区域划分为经纬度空间中的100x100
矩形网格,使用单个整数标记此网格中的每一个单元格,并使用该整数做为row ID。
咱们为每一个感兴趣的位置作了全部这些,一个用于上车,一个用于下车。 这为两个位置提供了两个字段:pickup_grid_id
,drop_grid_id
。
一样,位置数据有许多映射选项。 例如,咱们可能会转换为不一样的坐标系,应用投影或将位置聚合到实际区域(如邻域neighborhoods)。 这里,简单的方法就足够了。
复合映射(Complex mappings)
咱们还指望寻找行程持续时间和速度的趋势,所以咱们但愿在导入过程当中捕获此信息。 对于字段duration_minutes
咱们使用round((drop_timestamp - pickup_timestamp).minutes)
计算row ID
。 对于字段speed_mph
咱们使用round(dist_miles / (drop_timestamp - pickup_timestamp).minutes)计算
row ID`。 这些映射计算很简单,但因为它们须要对多列进行算术运算,所以在PDK中提供的基本映射器中捕获它们有点过于复杂。 相反,咱们定义自定义映射器来完成工做:
durm := pdk.CustomMapper{ Func: func(fields ...interface{}) interface{} { start := fields[0].(time.Time) end := fields[1].(time.Time) return end.Sub(start).Minutes() }, Mapper: lfm, }
导入处理
设计这个架构和映射后,咱们可使用PDK导入工具
读取的JSON定义文件捕获它。 运行pdk taxi
会根据此文件中的信息运行导入。 有关更多详细信息,请参阅PDK部分,或查看代码自己。
查询
如今咱们能够运行一些示例查询。
能够在一个PQL调用对cab类型进行检索、排序、计数。
TopN(cab_type)
{"results":[[{"id":1,"count":1992943},{"id":0,"count":7057}]]}
可使用相似的调用检索高流量位置ID。 这些ID对应于经纬度,能够从生成ID的映射中反算。
TopN(pickup_grid_id)
{"results":[[{"id":5060,"count":40620},{"id":4861,"count":38145},{"id":4962,"count":35268},...]]}
每一个passenger_count
(乘客计数)的total_amount
(总金额)的平均值能够经过一些后处理来计算。咱们使用少许的TopN
调用来检索passenger_count
的行程计数, 而后使用这些计数来计算平均值。
queries = '' pcounts = range(10) for i in pcounts: queries += "TopN(Row(passenger_count=%d), total_amount_dollars)" % i resp = requests.post(qurl, data=queries) average_amounts = [] for pcount, topn in zip(pcounts, resp.json()['results']): wsum = sum([r['count'] * r['key'] for r in topn]) count = sum([r['count'] for r in topn]) average_amounts.append(float(wsum)/count)
有关更多示例和详细信息,请参阅此ipython notebook。