PigPen 介绍:Clojure 的 Map-Reduce

这篇文章翻译自 http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html。以前翻译过关于 cascalog 的文章(Cascalog 入门(1)Cascalog 入门(2))。Cascalog 是基于 Cascading,PigPen 是基于 Apache Pig,二者是比较类似的东西。如下进入正文:html

PigPen

咱们今天很高兴向全世界发布了 PigPen,这是一个为 Clojure 准备的 Map-Reduce,它会最终被编译成 Apache Pig,可是你并不须要很是了解 Pig。java

PigPen 是什么?

  • 一种看起来和用起来跟 clojure.core 都很像的 map-reduce 语言
  • 能够把 map-reduce 的查询当成程序来写,而不是当成脚原本写
  • 为单元测试和迭代部署提供强大的支持

注意:若是你对 Clojure 不是很熟悉,咱们强烈推荐你试下这里这里 或者 这里 的教程来了解一些 基础git

<!--more-->github

真的又是一种 map-reduce 语言吗?

若是你会 Clojure,你就已经会 PigPen 了web

PigPen 的主要目标是要把语言带出等式的行列。PigPen 的操做符设计的和 Clojure 里尽量的类似,没有特殊的用户自定义函数(UDFs)。只须要定义函数(匿名的或者命名的),而后你就能像在 Clojure 程序里同样使用它们。express

这里有个经常使用的 word count 的例子:apache

(require '[pigpen.core :as pig])

(defn word-count [lines]
  (->> lines
    (pig/mapcat #(-> % first
                   (clojure.string/lower-case)
                   (clojure.string/replace #"[^\w\s]" "")
                   (clojure.string/split #"\s+")))
    (pig/group-by identity)
    (pig/map (fn [[word occurrences]] [word (count occurrences)]))))

这段代码定义了一个函数,这个函数返回一个 PigPen 的查询表达式。这个查询接受一系列的行做为输入,返回每一个单词出现的次数。你能够看到这只是一个 word count 的逻辑,并无设计到一些外部的东西,好比数据从哪里来的,会产生哪些输出。编程

能够组合吗?

固然。PigPen 的查询是写成函数的组合——数据输入、输出。只须要写一次,不须要处处复制、粘贴。数据结构

如今咱们利用以上定义的 word-count 函数,加上 load 和 store 命令,组成一个 PigPen 的查询:闭包

(defn word-count-query [input output]
  (->>
    (pig/load-tsv input)
    (word-count)
    (pig/store-tsv output)))

这个函数返回查询的 PigPen 表示,他本身不会作什么,咱们须要从本地执行它,或者生成一个脚本(以后会讲)。

你喜欢单元测试?咱们能够作

利用 PigPen,你能够 mock 输入数据来为你的查询写单元测试。不再须要交叉着手指想象提交到 cluster 上后会发生什么,也不须要截出部分文件来测试输入输出。

Mock 数据真的很容易,经过 pig/return 和 pig/constantly,你能够在你的脚本里注入任意的数据做为起始点。

一个经常使用的模式是利用 pig/take 来从实际数据源中抽样出几行,用 pig/return 把结果包一层,就获得了 mock 数据。

(use 'clojure.test)

(deftest test-word-count
  (let [data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]])]
    (is (= (pig/dump (word-count data))
           [["moon" 1]
            ["jumped" 2]
            ["dog" 1]
            ["over" 2]
            ["cow" 1]
            ["fox" 1]
            ["the" 4]]))))

pig/dump 操做符会在本地执行查询。

闭包

向你的查询传参数很麻烦,全部函数范围内的变量或者 let 的绑定在函数里均可用。

(defn reusable-fn [lower-bound data]
  (let [upper-bound (+ lower-bound 10)]
    (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))

注意 lower-bound 和 upper-bound 在生成脚本的时候就有了,在 cluster 上执行函数的时候也能使用。

那么我怎么用它呢?

只要告诉 PigPen 哪里会把一个查询写成一个 Pig 脚本:

(pig/write-script "word-count.pig"
                  (word-count-query "input.tsv" "output.tsv"))

这样你就能获得一个能够提交到 cluster 上运行的 Pig 脚本。这个脚本会用到 pigpen.jar,这是一个加入全部依赖的 uberjar,因此要保证这个 jar 也一块儿被提交了。还能够把你的整个 project 打包成一个 uberjar 而后提交,提交以前记得先重命名。怎么打包成 uberjar 请参照教程。

以前看到,咱们能够用 pig/dump 来本地运行查询,返回 Clojure 数据:

=> (def data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]]))
                          
#'pigpen-demo/data

=> (pig/dump (word-count data))
[["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]

若是你如今就像开始,请参照 getting started & tutorials

为何我须要 Map-Reduce?

Map-Reduce 对于处理单台机器搞不定的数据是颇有用,有了 PigPen,你能够像在本地处理数据同样处理海量数据。Map-Reduce 经过把数据分散到可能成千上万的集群节点来达到这一目的,这些节点每一个都会处理少许的数据,全部的处理都是并行的,这样完成一个任务就比单台机器快得多。像 join 和 group 这样的操做,须要多个节点数据集的协调,这种状况会经过公共的 join key 把数据分到同一个分区计算,join key 的同一个值会送到同一个指定的机器。一旦机器上获得了全部可能的值,就能作 join 的操做或者作其余有意思的事。

想看看 PigPen 怎么作 join 的话,就来看看 pig/cogroup 吧。cogroup 接受任意数量的数据集而后根据一个共同的 key 来分组。假设咱们有这样的数据:

foo:

  {:id 1, :a "abc"}
  {:id 1, :a "def"}
  {:id 2, :a "abc"}

bar:

  [1 42]
  [2 37]
  [2 3.14]

baz:

  {:my_id "1", :c [1 2 3]]}

若是想要根据 id 分组,能够这样:

(pig/cogroup (foo by :id)
             (bar by first)
             (baz by #(-> % :my_id Long/valueOf))
             (fn [id foos bars bazs] ...))

前三个参数是要 join 的数据集,每个都会指定一个函数来从数据源中选出 key。最后的一个参数是一个函数,用来把分组结果结合起来。在咱们的例子中,这个函数会被调用两次:

[1 ({:id 1, :a "abc"}, {:id 1, :a "def"})
   ([1 42])
   ({:my_id "1", :c [1 2 3]]})]
[2 ({:id 2, :a "abc"})
   ([2 37] [2 3.14])
   ()]

这把全部 id 为 1 的值和 id 为 2 的值结合在了一块儿。不一样的键值被独立的分配到不一样的机器。默认状况下,key 能够不在数据源中出现,可是有选项能够指定必须出现。

Hadoop 提供了底层的接口作 map-reduce job,但即使如此仍是有限制的,即一次只会运行一轮 map-reduce,没有数据流和复杂查询的概念。Pig 在 Hadoop 上抽象出一层,但到目前为止,它仍旧只是一门脚本语言,你仍是须要用 UDF 来对数据作一些有意思的事情。PigPen 更进一步的作了抽象,把 map-reduce 作成了一门语言。

若是你刚接触 map-reduce,咱们推荐你看下这里

作 PigPen 的动机

  • **代码重用。**咱们但愿能定义一段逻辑,而后经过穿参数把它用到不一样的 job 里。
  • **代码一体化。**咱们不想在脚本和不一样语言写的 UDF。 之间换来换去,不想考虑不一样数据类型在不一样语言中的对应关系。
  • **组织好代码。**咱们想把代码写在多个文件里,想怎么组织怎么组织,不要被约束在文件所属的 job 里。
  • **单元测试。**咱们想让咱们的抽样数据关联上咱们的单元测试,咱们想让咱们的单元测试在不存取数据的状况下测试业务逻辑。
  • **快速迭代。**咱们想可以在任什么时候候注入 mock data,咱们想在不用等 JVM 启动的状况下测试一个查询。
  • **只给想要命名的东西命名。**大部分 map-reduce 语言对中间结果要求命名和指定数据结构,这使得用 mock data 来测试单独的 job 变得困难。咱们想要在咱们以为合适的地方组织业务逻辑并命名,而不是受语言的指使。
  • 咱们受够了写脚本,咱们想要写程序。

注意:PigPen 不是 一个 Clojure 对 Pig 脚本的封装,颇有可能产生的脚本是人看不懂的。

设计和功能

PigPen 设计的和 Clojure 尽量保持一致。Map-Reduce 是函数式编程,那为何不利用一门已存在的强大的函数式编程语言呢?这样不光学习曲线低,并且大多数概念也能更容易的应用到大数据上。

在 PigPen 中,查询被当作 expression tree 处理,每一个操做符都被表示须要的行为信息的 map,这些 map 能够嵌套在一块儿组成一个复杂查询的树形表式。每一个命令包含了指向祖命令的引用。在执行的时候,查询树会被转化成一个有向无环的查询图。这能够很容易的合并重复的命令,优化相关命令的顺序,而且能够利用 debug 信息调试查询。

优化

去重 当咱们把查询表示成操做图的时候,去重是一件很麻烦的事。Clojure 提供了值相等的操做,即若是连个对象的内容相同,它们就相等。若是两个操做有相同的表示,那它们彻底相同,因此在写查询的时候不用担忧重复的命令,它们在执行以前都会被优化。

举个例子,假设咱们有这样两个查询:

(let [even-squares (->>
                     (pig/load-clj "input.clj")
                     (pig/map (fn [x] (* x x)))
                     (pig/filter even?)
                     (pig/store-clj "even-squares.clj"))
      odd-squares (->>
                    (pig/load-clj "input.clj")
                    (pig/map (fn [x] (* x x)))
                    (pig/filter odd?)
                    (pig/store-clj "odd-squares.clj"))]
  (pig/script even-squares odd-squares))

在这个查询中,咱们从一个文件加载数据,计算每一个数的平方,而后分红偶数和奇数,操做图看起来是这样: 在此输入图片描述

这符合咱们的查询,可是作了不少额外的工做。咱们加载了 input.clj 两次,全部数的平方也都计算了两次。这看上去可能没有不少工做,可是当你对不少数据作这样的事情,简单的操做累加起来就不少。为了优化这个查询,咱们能够找出相同的操做。看第一眼发现咱们计算平方的操做多是一个候选,可是他们有不一样的父节点,所以不能把他们合并在一块儿。可是咱们能够把加载函数合并,由于他们没有父节点,并且他们加载相同的文件。

如今咱们的图看起来是这样: 在此输入图片描述

如今咱们值加载一次数据,这会省一些时间,但仍是要计算两次平方。由于咱们如今只有一个加载的命令,咱们的 map 操做如今相同,能够合并: 在此输入图片描述

这样咱们就获得了一个优化过的查询,每一个操做都是惟一的。由于咱们每次只会合并一个命令,咱们不会修改查询的逻辑。你能够很容易的生成查询,而不用担忧重复的执行,PigPen 对重复的部分只会执行一次。

序列化 当咱们用 Clojure 处理完数据之后,数据必须序列化成二进制字节,Pig 才能在集群的机器间传数据。这对 PigPen 是一个很昂贵可是必须的过程。幸运的是一个脚本中常常有不少连续的操做能够合成一个操做,这对于没必要要的序列化和反序列化节省了不少时间。例如,任意连续的 map,filter 和 mapcat 操做均可以被重写成一个单独的 mapcat 操做。

咱们经过一些例子来讲明: 在此输入图片描述

在这个例子中,咱们从一个序列化的值(蓝色)4开始,对它反序列化(橙色),执行咱们的 map 函数,而后再把它序列化。

如今咱们来试一个稍微复杂一点的(更现实的)例子。在这个例子中,咱们执行一个 map,一个 mapcat 和一个 filter 函数。

若是你之前没用过 mapcat,我能够告诉你这是对一个值运行一个函数而后返回一串值的操做。那个序列会被 flatten,每一个值都会传给下一步使用。在 Clojure 里,那是 map 和 concat 联合以后的结果,在 Scala 里,这叫作 flatMap,而在 C# 里叫 selectMany。

在下图中,左边的流程是咱们优化以前的查询,右边的是优化以后的。和第一个例子同样,咱们一样从 4 开始,计算平方,而后对这个值作减一的操做,返回自己和加一的操做。Pig 会获得这个值的集合而后作 flatten,使每一个值都成为下一步的输入。注意在和 Pig 交互的时候咱们要序列化和反序列化。第三步,也就是最后一步对数据进行过滤,在这个例子中咱们只保留奇数值。如图所示,咱们在任意两步之间都序列化和反序列化数据。

右边的图显示了优化后的结果。每一个操做都返回了一个元素序列。map 操做返回一个只有单元素 16 的序列,mapcat 也同样,过滤操做返回 0 元素或单元素的序列。经过是这些命令保持一致,咱们能够很容易的把他们合并到一块儿。咱们在一套命令中flattrn 了更多的值序列,可是在步骤之间没有序列化的消耗。虽然卡起来更复杂,可是这个优化是每一个步骤都执行的更快了。 在此输入图片描述

测试,本地执行,以及调试

交互式开发,测试,以及可调试性是 PigPen 的关键功能。若是你有一个一次运行好几天的 job,那你最不想看到的是跑了十一个小时后冒出来一个 bug。PigPen 有个基于 rx 的本地运行模式。这可让咱们对查询写单元测试。这样咱们能够更有把握的知道运行的时候不会挂掉,而且能返回期待的值。更牛逼的是这个功能可让咱们进行交互式的开发。

一般状况下,咱们刚开始会从数据源中选一些记录来作单元测试。由于 PigPen 在 REPL 中返回数据,咱们不须要额外构造测试数据。这样,经过 REPL,咱们能够根据须要对 mock 数据作 map,filter,join 和 reduce 的操做。每一个步骤均可以验证结果是否是咱们想要的。这种方法相对于写一长串脚本而后凭空想象能产生更可靠的数据。还有一个有用的地方是能够把复杂的查询写成几个较小的函数单元。Map-reduce 查询随着数据源的量级可能产生剧烈的增长或减小。当你把脚本做为一个总体测试的时候,你可能要读一大堆数据,最后产生一小撮数据。经过把查询细化成较小的单元,你能够对读 100 行,产生 2 行这样子来测试一个单元,而后测试第二个单元的时候能够用这两行做为模板来产生 100 多个数据。

调试模式对于解决异常颇有用,启用后会在正常输出的同时,把脚本中每一个操做的结果写到磁盘上。这对于像 Hadoop 这样的环境颇有用,在这种状况下,你无法单步跟踪代码,并且每一个步骤均可能花好几个小时。调试模式还能够可视化流程图。这样能够可视化的把执行计划的和实际操做的输出关联起来。

要启用调试模式,请参考 pig/write-scriptpig/generate-script 的选项,这会在指定的目录下写额外的调试输出。

启用调试模式的例子:

(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)

要启用可视化模式,能够看看 pig/showpig/dump&show

可视化的例子:

(pig/show my-pigpen-query)        ;; Shows a graph of the query
(pig/dump&show my-pigpen-query)   ;; Shows a graph and runs it locally

扩展 PigPen

PigPen 有个好用的功能是能够很容易的建立本身的操做符。例如,咱们能够定义像求差集和交集这样的集合和多集合的操做符,这些只是像 co-group 这样的操做符的变体,可是若是能定义,测试它们,而后不再去想这些逻辑怎么实现的,那就更好了。

这对更复杂的操做也是颇有用的。对于集合数据咱们有 sumavgminmaxsdquantiles 这些可重用的统计操做符,还有 pivot 这样的操做符能够把多维数据分组而后对每组计数。

这些操做自己都是简单的操做,可是当你把它们从你的查询中抽象出来以后,你的查询也会变的简单不少。这时候你能够花更多的时间去想怎么解决问题,而不是每次都重复写基本的统计方法。

为何用 Pig?

咱们选择 Pig 是由于咱们不想把 Pig 已有的优化的逻辑重写一遍,不考虑语言层面的东西的话,Pig 在移动大数据方面作得很好。咱们的策略是利用 Pig 的 DataByteArray 二进制格式来移动序列化的 Clojure 数据。在大多数状况下,Pig 不须要知道数据的底层展示形式。Byte array 能够很快的作比较,这样对于 join 和 group 操做,Pig 只须要简单的比较序列化的二进制,若是序列化的输出一致,在 Clojure 中值就相等。不过这对于数据排序不适用。二进制的排序其实没什么用,并且和原始数据的排序结果也不同。要想排序,还得把数据转化回去,并且只能对简单类型排序。这也是 Pig 强加给 PigPen 的为数很少的一个缺陷。

咱们在决定作 PigPen 以前也评估过其余语言。第一个要求就是那必须是一门编程语言,并非一种脚本语言加上一堆 UDF。咱们简单看过 Scalding,它看上去颇有前途,可是咱们的团队主要是用的 Clojure。 能够这么说,PigPen 对于 Clojure 就像是 Scalding 对于 Scala。Cascalog 是用 Clojure 写 map-reduce 一般会用的语言,可是从过去的经验来看,Cascalog 对于平常工做其实没什么用,你须要学一套复杂的新语法和不少概念,经过变量名对齐来作隐式 join 也不是理想的方案,若是把操做符顺序弄错了会形成很大的性能问题,Cascalog 会 flatten 数据结果(这可能很浪费),并且组合查询让人感受很别扭。

咱们也考虑过对 PigPen 用一门宿主语言。这样也能在 Hive 之上构建相似的抽象,可是对每一个中间产物都定义 schema 跟 Clojure 的理念不符。并且 Hive 相似与 SQL,使得从功能性语言翻译更难。像 SQL 和 Hive 这样的关系模型语言与像 Clojure 和 Pig 这样的功能性语言之间有着巨大的差。最后,最直接的解决办法就是在 Pig 之上作一层抽象。

相关文章
相关标签/搜索