这篇文章翻译自 http://nathanmarz.com/blog/introducing-cascalog-a-clojure-based-query-language-for-hado.html。原文做者是写 Storm 和 Cascalog 项目的发起人。翻译这篇文章也为了下次须要参考的时候能有个中文版本,毕竟中文的看起来更快一些。html
如下进入正文。git
好吧,咱们如今来看看 Cascalog 里都有哪些东西,我会经过一系列实例来讲明。这些实例都用了 Cascalog 项目中的 “playground” ,因此我建议你把 Cascalog 的代码下载下来,而后在 REPL 环境里跟着我一块儿作(跟着 README 作,只须要花几分钟就能搞定)。github
首先启动 REPL 环境,加载 “playground”:数据库
lein repl user=> (use 'cascalog.playground) (bootstrap)
这会把咱们运行实例须要的全部东西都加载进来。你能够在 playground.clj 里看到咱们等会查询须要用到的数据集。咱们先来执行一条查询来找出全部 25 岁的人:bootstrap
user=> (?<- (stdout) [?person] (age ?person 25))
这个查询能够当作 “Find all ?person for which ?person has an age that is equal to 25”。在做业运行的时候,你能够看到 Hadoop 的 log,几秒钟以后这个查询的结果就会被打印出来。函数
而后再来一个范围查询找出数据集中全部年龄小于 30 岁的人:oop
user=> (?<- (stdout) [?person] (age ?person ?age) (< ?age 30))
这也很简单,此次咱们是把年龄绑定到 ?age,而后再加上一个约束说这个 ?age 要比 30 小。翻译
而后再来一个查询,此次咱们会在结果里包含 ages 和 people:code
user=> (?<- (stdout) [?person ?age] (age ?person ?age) (< ?age 30))
咱们要作的只是在查询的 vector 里面加上 ?age。orm
再来一个查询找出 Emily 关注的全部男性(male people):
user=> (?<- (stdout) [?person] (follows "emily" ?person) (gender ?person "m"))
你可能没注意,这实际上是一个 Join 操做,两个 ?person 是同一个东西,而因为 “follows” 和 “gender” 是两个独立的数据源,Cascalog 会用一个 Join 操做来解析这个查询。
<!--more-->
而后咱们更具体的看一下查询的结构,以解析如下查询为例:
user=> (?<- (stdout) [?person ?a2] (age ?person ?age) (< ?age 30) (* 2 ?age :> ?a2))
咱们用的操做符是 ?<-
,这个操做符会定义个查询而后执行。?<-
实际上是对 <-
(建立查询的操做符) 和 ?-
(执行查询的操做符) 的包装。咱们会在后面建立更复杂的查询的时候再来看看怎么用。
首先,咱们要在查询里说明想把结果发送到哪里,在这里咱们用了 (stdout)
,(stdout)
会建立一个 Cascalog 的 tap,这个 tap 在查询结束以后把内容写到标准输出。任意的 Cascalog tap 均可以做为输出,也就是说,你能够把输出的数据写到任意的文件格式(如 Sequence file, 普通文本,等等)。
在定义完的输出以后,还须要在一个 Clojure vector 里定义查询的结果变量,这里咱们用了 ?person 和 ?a2。
接下来,要定义一些 “谓词” 来定义和约束结果变量。一共有三种谓词:
1 生成器(Generators):生成器是一个数据源,包含两种:
<-
定以的查询2 操做(Operations):全部变量的一个隐式关系,能够是绑定新变量的函数,或者是一个过滤器(filter)。
3 累加器(Aggregators):count
,sum
,min
,max
,等等
谓词有一个名字,一串输入变量和一串输出变量,咱们上面的谓词有:
:>
关键字用来分隔输入变量和输出变量,若是没有指定 :>
关键字,则变量会被当作操做(operations)的输入或者生成器(generators)和累加器(aggregators)的输出。
在 playground.clj 中,age
谓词指向一个 tap,因此它是一个生成器,生成了 ?person 和 ?age。
谓词 <
是一个 Clojure 函数,因为咱们没有指定输出变量,这个谓词将做为一个过滤器,会过滤掉全部的 ?age 小于 30 的记录。若是咱们在这里指定:
(< ?age 30 :> ?young)
那么 <
将做为一个函数,并将一个 boolean 类型的变量绑定到 ?young,表示年龄是否小于 30。
谓词的顺序没有关系。Cascalog 是纯定义型的。
变量是以 ?
或 !
开始的符号,若是有时不须要输出变量的值能够用 _
符号来略过。查询里的其余部分都会被求值而后做为常量插入,这个功能叫 “常量替换”,到目前为止咱们已经用了不少。若是把常量做为输出变量,会对函数的结果作一些过滤,好比:
(* 4 ?v2 :> 100)
这里有两个常量:4 和 100。4 替换了一个输入变量,而 100 做为过滤条件,将只保留乘 4 后等于 100 的 ?v2 的值。字符串,数字,其余基本单元以及任意在 Hadoop serializers 注册过的 Object 均可以做为常量。
再回到例子,咱们来找出 follow 关系中关注比本身年龄小的人:
user=> (?<- (stdout) [?person1 ?person2] (age ?person1 ?age1) (follows ?person1 ?person2) (age ?person2 ?age2) (< ?age2 ?age1))
再执行一遍加上年龄差:
user=> (?<- (stdout) [?person1 ?person2 ?delta] (age ?person1 ?age1) (follows ?person1 ?person2) (age ?person2 ?age2) (- ?age2 ?age1 :> ?delta) (< ?delta 0))
如今来看一下咱们的第一个累加器,咱们来找出年龄小于 30 岁的人的数量:
user=> (?<- (stdout) [?count] (age _ ?a) (< ?a 30) (c/count ?count))
这个查询会算出记录中全部人的数量,咱们能够按分组累计数量,好比,要找出每一个人关注的人的数量能够这样:
user=> (?<- (stdout) [?person ?count] (follows ?person _) (c/count ?count))
由于咱们在查询的结果变量里定义了 ?person,Cascalog 会按照 ?person 分组,而后对每一个分组执行 c/count
累加器。
你也能够在一个查询中使用多个累加器,它们会对同一个记录的分组执行。例子:经过 count
和 sum
的组合来获得一个国家的平均年龄:
user=> (?<- (stdout) [?country ?avg] (location ?person ?country _ _) (age ?person ?age) (c/count ?count) (c/sum ?age :> ?sum) (div ?sum ?count :> ?avg))
注意,咱们对最后累计后的结果用了 div
操做。依赖与累加器的输出变量的操做都是在累加器运行完以后再执行的。
接下来来写一个统计一组句子中每一个次出现的次数,首先用这个查询定义一个自定义操做:
user=> (defmapcatop split [sentence] (seq (.split sentence "\\s+"))) user=> (?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word) (c/count ?count))
defmapcatop split
定义的操做把只有一个字段的 sentence 做为输入,输出 0 个或多个元组。
deffilterop
定义返回布尔值的过滤操做,表示这个元组会不会被过滤掉。
defmapop
定义的函数只返回一个元组。
defaggregateop
定义一个累加器。
这些操做也能够在 Cascalog 的 workflow API 中被直接使用。
咱们的 word count 的查询还有一个问题,就是在会区分大小写,咱们能够这样修改:
user=> (defn lowercase [w] (.toLowerCase w)) user=> (?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word1) (lowercase ?word1 :> ?word) (c/count ?count))
就如你所看到的,普通的 Clojure 函数也能够被当作操做来使用。Clojure 函数在没有输入参数的时候是一个过滤器,在有输入参数的时候是一个 map 操做。想要输出 0 个或多个元组必须用 defmapcatop
。
这还有一个查询,会统计按年龄和性别分组后各组人的数量:
user=> (defn agebucket [age] (find-first (partial <= age) [17 25 35 45 55 65 100 200])) user=> (?<- (stdout) [?bucket ?gender ?count] (age ?person ?age) (gender ?person ?gender) (agebucket ?age :> ?bucket) (c/count ?count))
Cascalog 有个叫 “非空变量” 的功能,可让你更优雅的处理空值。咱们到目前为止一直在用非空变量。以 ?
开头的变量都是非空变量,以 !
开头的变量是能够为空的变量,当非空变量被绑上控制时,Cascalog 会过滤掉。
咱们比较下面两个查询来看看非空变量的效果:
user=> (?<- (stdout) [?person ?city] (location ?person _ _ ?city)) user=> (?<- (stdout) [?person !city] (location ?person _ _ !city))
第二个查询在结果集中会包含一些空值。
最后,咱们来看一看由子查询组成的复杂查询。先查出一对 follow 关系中的两我的都 follow 2 个以上人的集合:
user=> (let [many-follows (<- [?person] (follows ?person _) (c/count ?c) (> ?c 2))] (?<- (stdout) [?person1 ?person2] (many-follows ?person1) (many-follows ?person2) (follows ?person1 ?person2)))
这里咱们用 let
定义一个 many-follow 子查询,这个子查询使用 <-
来定义。而后就能够在 let
的 body 里使用 many-follow 子查询。
还能够运行包含多个输出的查询。若是咱们还想要上面查询中 many-follow 子查询的结果,能够这样写:
user=> (let [many-follows (<- [?person] (follows ?person _) (c/count ?c) (> ?c 2)) active-follows (<- [?p1 ?p2] (many-follows ?p1) (many-follows ?p2) (follows ?p1 ?p2))] (?- (stdout) many-follows (stdout) active-follows))
这里咱们先定义了两个查询,可是没有执行。在最后才用查询操做符 ?-
来顺序执行两个查询。