Cascalog 入门(1)—— 运行于 Hadoop 的基于 Clojure 的查询语言

运行于 Hadoop 的基于 Clojure 的查询语言

这篇文章翻译自 http://nathanmarz.com/blog/introducing-cascalog-a-clojure-based-query-language-for-hado.html。原文做者是写 StormCascalog 项目的发起人。翻译这篇文章也为了下次须要参考的时候能有个中文版本,毕竟中文的看起来更快一些。html

如下进入正文。git

主要特点

  • 简单: 函数,过滤器(filter)和累加器(aggregators)都用统一的语法,Join 操做都是隐式的,看起来很天然
  • 表现力强: 逻辑表现力强,很容易在查询中运行任意的 Clojure 代码
  • 交互性: 能够在 Clojure REPL 中运行查询语句
  • 可扩展性: Cascalog 的查询语句会被解析为一系列 MapReduce Job
  • 查询任意数据: 经过 Cascalog 的 “Tap” 抽象能够查询 HDFS 文件,数据库数据和(或)本地数据
  • 处理空值: 空值可能很麻烦,Cascalog 有一个叫 “非空变量” 的功能能够简化空值的处理
  • 与 Cascading 的互操做: 为 Cascalog 定以的操做能够用到 Cascading
  • 与 Coljure 的互操做: 可使用 Clojure 函数做为 Cascalog 操做和过滤器,并且由于 Cascalog 是一种 Clojure DSL,能够讲 Cascalog 运用到其余的 Clojure 代码

好吧,咱们如今来看看 Cascalog 里都有哪些东西,我会经过一系列实例来讲明。这些实例都用了 Cascalog 项目中的 “playground” ,因此我建议你把 Cascalog 的代码下载下来,而后在 REPL 环境里跟着我一块儿作(跟着 README 作,只须要花几分钟就能搞定)。github

基本查询

首先启动 REPL 环境,加载 “playground”:数据库

lein repl
user=> (use 'cascalog.playground) (bootstrap)

这会把咱们运行实例须要的全部东西都加载进来。你能够在 playground.clj 里看到咱们等会查询须要用到的数据集。咱们先来执行一条查询来找出全部 25 岁的人:bootstrap

user=> (?<- (stdout) [?person] (age ?person 25))

这个查询能够当作 “Find all ?person for which ?person has an age that is equal to 25”。在做业运行的时候,你能够看到 Hadoop 的 log,几秒钟以后这个查询的结果就会被打印出来。函数

而后再来一个范围查询找出数据集中全部年龄小于 30 岁的人:oop

user=> (?<- (stdout) [?person] (age ?person ?age) (< ?age 30))

这也很简单,此次咱们是把年龄绑定到 ?age,而后再加上一个约束说这个 ?age 要比 30 小。翻译

而后再来一个查询,此次咱们会在结果里包含 ages 和 people:code

user=> (?<- (stdout) [?person ?age] (age ?person ?age)
               (< ?age 30))

咱们要作的只是在查询的 vector 里面加上 ?age。orm

再来一个查询找出 Emily 关注的全部男性(male people):

user=> (?<- (stdout) [?person] (follows "emily" ?person)
               (gender ?person "m"))

你可能没注意,这实际上是一个 Join 操做,两个 ?person 是同一个东西,而因为 “follows” 和 “gender” 是两个独立的数据源,Cascalog 会用一个 Join 操做来解析这个查询。

查询的结构

<!--more-->

而后咱们更具体的看一下查询的结构,以解析如下查询为例:

user=> (?<- (stdout) [?person ?a2] (age ?person ?age)
              (< ?age 30) (* 2 ?age :> ?a2))

咱们用的操做符是 ?<-,这个操做符会定义个查询而后执行。?<- 实际上是对 <-(建立查询的操做符) 和 ?-(执行查询的操做符) 的包装。咱们会在后面建立更复杂的查询的时候再来看看怎么用。

首先,咱们要在查询里说明想把结果发送到哪里,在这里咱们用了 (stdout)(stdout) 会建立一个 Cascalog 的 tap,这个 tap 在查询结束以后把内容写到标准输出。任意的 Cascalog tap 均可以做为输出,也就是说,你能够把输出的数据写到任意的文件格式(如 Sequence file, 普通文本,等等)。

在定义完的输出以后,还须要在一个 Clojure vector 里定义查询的结果变量,这里咱们用了 ?person 和 ?a2。

接下来,要定义一些 “谓词” 来定义和约束结果变量。一共有三种谓词:

1 生成器(Generators):生成器是一个数据源,包含两种:

  • Cascading Tap:好比 HDFS 上的一份数据
  • 一个由 <- 定以的查询

2 操做(Operations):全部变量的一个隐式关系,能够是绑定新变量的函数,或者是一个过滤器(filter)。

3 累加器(Aggregators):countsumminmax,等等

谓词有一个名字,一串输入变量和一串输出变量,咱们上面的谓词有:

  • (age ?person ?age)
  • (< ?age 30)
  • (* 2 ?age :> ?a2)

:> 关键字用来分隔输入变量和输出变量,若是没有指定 :> 关键字,则变量会被当作操做(operations)的输入或者生成器(generators)和累加器(aggregators)的输出。

在 playground.clj 中,age 谓词指向一个 tap,因此它是一个生成器,生成了 ?person 和 ?age。

谓词 < 是一个 Clojure 函数,因为咱们没有指定输出变量,这个谓词将做为一个过滤器,会过滤掉全部的 ?age 小于 30 的记录。若是咱们在这里指定:

(< ?age 30 :> ?young)

那么 < 将做为一个函数,并将一个 boolean 类型的变量绑定到 ?young,表示年龄是否小于 30。

谓词的顺序没有关系。Cascalog 是纯定义型的。

变量和常量替换

变量是以 ?! 开始的符号,若是有时不须要输出变量的值能够用 _ 符号来略过。查询里的其余部分都会被求值而后做为常量插入,这个功能叫 “常量替换”,到目前为止咱们已经用了不少。若是把常量做为输出变量,会对函数的结果作一些过滤,好比:

(* 4 ?v2 :> 100)

这里有两个常量:4 和 100。4 替换了一个输入变量,而 100 做为过滤条件,将只保留乘 4 后等于 100 的 ?v2 的值。字符串,数字,其余基本单元以及任意在 Hadoop serializers 注册过的 Object 均可以做为常量。

再回到例子,咱们来找出 follow 关系中关注比本身年龄小的人:

user=> (?<- (stdout) [?person1 ?person2] 
    (age ?person1 ?age1) (follows ?person1 ?person2)
    (age ?person2 ?age2) (< ?age2 ?age1))

再执行一遍加上年龄差:

user=> (?<- (stdout) [?person1 ?person2 ?delta] 
    (age ?person1 ?age1) (follows ?person1 ?person2)
    (age ?person2 ?age2) (- ?age2 ?age1 :> ?delta)
    (< ?delta 0))

累加器(Aggregators)

如今来看一下咱们的第一个累加器,咱们来找出年龄小于 30 岁的人的数量:

user=> (?<- (stdout) [?count] (age _ ?a) (< ?a 30)
              (c/count ?count))

这个查询会算出记录中全部人的数量,咱们能够按分组累计数量,好比,要找出每一个人关注的人的数量能够这样:

user=> (?<- (stdout) [?person ?count] (follows ?person _)
              (c/count ?count))

由于咱们在查询的结果变量里定义了 ?person,Cascalog 会按照 ?person 分组,而后对每一个分组执行 c/count 累加器。

你也能够在一个查询中使用多个累加器,它们会对同一个记录的分组执行。例子:经过 countsum 的组合来获得一个国家的平均年龄:

user=> (?<- (stdout) [?country ?avg] 
   (location ?person ?country _ _) (age ?person ?age)
   (c/count ?count) (c/sum ?age :> ?sum)
   (div ?sum ?count :> ?avg))

注意,咱们对最后累计后的结果用了 div 操做。依赖与累加器的输出变量的操做都是在累加器运行完以后再执行的。

自定义操做

接下来来写一个统计一组句子中每一个次出现的次数,首先用这个查询定义一个自定义操做:

user=> (defmapcatop split [sentence]
       (seq (.split sentence "\\s+")))

user=> (?<- (stdout) [?word ?count] (sentence ?s)
              (split ?s :> ?word) (c/count ?count))

defmapcatop split 定义的操做把只有一个字段的 sentence 做为输入,输出 0 个或多个元组。

deffilterop 定义返回布尔值的过滤操做,表示这个元组会不会被过滤掉。

defmapop 定义的函数只返回一个元组。

defaggregateop 定义一个累加器。

这些操做也能够在 Cascalog 的 workflow API 中被直接使用。

咱们的 word count 的查询还有一个问题,就是在会区分大小写,咱们能够这样修改:

user=> (defn lowercase [w] (.toLowerCase w))

user=> (?<- (stdout) [?word ?count] 
        (sentence ?s) (split ?s :> ?word1)
        (lowercase ?word1 :> ?word) (c/count ?count))

就如你所看到的,普通的 Clojure 函数也能够被当作操做来使用。Clojure 函数在没有输入参数的时候是一个过滤器,在有输入参数的时候是一个 map 操做。想要输出 0 个或多个元组必须用 defmapcatop

这还有一个查询,会统计按年龄和性别分组后各组人的数量:

user=> (defn agebucket [age] 
        (find-first (partial <= age) [17 25 35 45 55 65 100 200]))

user=> (?<- (stdout) [?bucket ?gender ?count] 
        (age ?person ?age) (gender ?person ?gender)
        (agebucket ?age :> ?bucket) (c/count ?count))

非空变量

Cascalog 有个叫 “非空变量” 的功能,可让你更优雅的处理空值。咱们到目前为止一直在用非空变量。以 ? 开头的变量都是非空变量,以 ! 开头的变量是能够为空的变量,当非空变量被绑上控制时,Cascalog 会过滤掉。

咱们比较下面两个查询来看看非空变量的效果:

user=> (?<- (stdout) [?person ?city] (location ?person _ _ ?city))

user=> (?<- (stdout) [?person !city] (location ?person _ _ !city))

第二个查询在结果集中会包含一些空值。

子查询

最后,咱们来看一看由子查询组成的复杂查询。先查出一对 follow 关系中的两我的都 follow 2 个以上人的集合:

user=> (let [many-follows (<- [?person] (follows ?person _)
                               (c/count ?c) (> ?c 2))]
        (?<- (stdout) [?person1 ?person2] (many-follows ?person1)
         (many-follows ?person2) (follows ?person1 ?person2)))

这里咱们用 let 定义一个 many-follow 子查询,这个子查询使用 <- 来定义。而后就能够在 let 的 body 里使用 many-follow 子查询。

还能够运行包含多个输出的查询。若是咱们还想要上面查询中 many-follow 子查询的结果,能够这样写:

user=> (let [many-follows (<- [?person] (follows ?person _)
                             (c/count ?c) (> ?c 2))
      active-follows (<- [?p1 ?p2] (many-follows ?p1)
                       (many-follows ?p2) (follows ?p1 ?p2))]
    (?- (stdout) many-follows (stdout) active-follows))

这里咱们先定义了两个查询,可是没有执行。在最后才用查询操做符 ?- 来顺序执行两个查询。

相关文章
相关标签/搜索