这篇文章翻译自 http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html。以前翻译过关于 cascalog 的文章(Cascalog 入门(1),Cascalog 入门(2))。Cascalog 是基于 Cascading,PigPen 是基于 Apache Pig,二者是比较类似的东西。如下进入正文:html
咱们今天很高兴向全世界发布了 PigPen,这是一个为 Clojure 准备的 Map-Reduce,它会最终被编译成 Apache Pig,可是你并不须要很是了解 Pig。java
注意:若是你对 Clojure 不是很熟悉,咱们强烈推荐你试下这里,这里 或者 这里 的教程来了解一些 基础。git
<!--more-->github
若是你会 Clojure,你就已经会 PigPen 了web
PigPen 的主要目标是要把语言带出等式的行列。PigPen 的操做符设计的和 Clojure 里尽量的类似,没有特殊的用户自定义函数(UDFs)。只须要定义函数(匿名的或者命名的),而后你就能像在 Clojure 程序里同样使用它们。express
这里有个经常使用的 word count 的例子:apache
(require '[pigpen.core :as pig]) (defn word-count [lines] (->> lines (pig/mapcat #(-> % first (clojure.string/lower-case) (clojure.string/replace #"[^\w\s]" "") (clojure.string/split #"\s+"))) (pig/group-by identity) (pig/map (fn [[word occurrences]] [word (count occurrences)]))))
这段代码定义了一个函数,这个函数返回一个 PigPen 的查询表达式。这个查询接受一系列的行做为输入,返回每一个单词出现的次数。你能够看到这只是一个 word count 的逻辑,并无设计到一些外部的东西,好比数据从哪里来的,会产生哪些输出。编程
固然。PigPen 的查询是写成函数的组合——数据输入、输出。只须要写一次,不须要处处复制、粘贴。数据结构
如今咱们利用以上定义的 word-count 函数,加上 load 和 store 命令,组成一个 PigPen 的查询:闭包
(defn word-count-query [input output] (->> (pig/load-tsv input) (word-count) (pig/store-tsv output)))
这个函数返回查询的 PigPen 表示,他本身不会作什么,咱们须要从本地执行它,或者生成一个脚本(以后会讲)。
利用 PigPen,你能够 mock 输入数据来为你的查询写单元测试。不再须要交叉着手指想象提交到 cluster 上后会发生什么,也不须要截出部分文件来测试输入输出。
Mock 数据真的很容易,经过 pig/return 和 pig/constantly,你能够在你的脚本里注入任意的数据做为起始点。
一个经常使用的模式是利用 pig/take 来从实际数据源中抽样出几行,用 pig/return 把结果包一层,就获得了 mock 数据。
(use 'clojure.test) (deftest test-word-count (let [data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])] (is (= (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]))))
pig/dump 操做符会在本地执行查询。
向你的查询传参数很麻烦,全部函数范围内的变量或者 let 的绑定在函数里均可用。
(defn reusable-fn [lower-bound data] (let [upper-bound (+ lower-bound 10)] (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))
注意 lower-bound 和 upper-bound 在生成脚本的时候就有了,在 cluster 上执行函数的时候也能使用。
只要告诉 PigPen 哪里会把一个查询写成一个 Pig 脚本:
(pig/write-script "word-count.pig" (word-count-query "input.tsv" "output.tsv"))
这样你就能获得一个能够提交到 cluster 上运行的 Pig 脚本。这个脚本会用到 pigpen.jar,这是一个加入全部依赖的 uberjar,因此要保证这个 jar 也一块儿被提交了。还能够把你的整个 project 打包成一个 uberjar 而后提交,提交以前记得先重命名。怎么打包成 uberjar 请参照教程。
以前看到,咱们能够用 pig/dump 来本地运行查询,返回 Clojure 数据:
=> (def data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])) #'pigpen-demo/data => (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]
若是你如今就像开始,请参照 getting started & tutorials。
Map-Reduce 对于处理单台机器搞不定的数据是颇有用,有了 PigPen,你能够像在本地处理数据同样处理海量数据。Map-Reduce 经过把数据分散到可能成千上万的集群节点来达到这一目的,这些节点每一个都会处理少许的数据,全部的处理都是并行的,这样完成一个任务就比单台机器快得多。像 join 和 group 这样的操做,须要多个节点数据集的协调,这种状况会经过公共的 join key 把数据分到同一个分区计算,join key 的同一个值会送到同一个指定的机器。一旦机器上获得了全部可能的值,就能作 join 的操做或者作其余有意思的事。
想看看 PigPen 怎么作 join 的话,就来看看 pig/cogroup 吧。cogroup 接受任意数量的数据集而后根据一个共同的 key 来分组。假设咱们有这样的数据:
foo: {:id 1, :a "abc"} {:id 1, :a "def"} {:id 2, :a "abc"} bar: [1 42] [2 37] [2 3.14] baz: {:my_id "1", :c [1 2 3]]}
若是想要根据 id 分组,能够这样:
(pig/cogroup (foo by :id) (bar by first) (baz by #(-> % :my_id Long/valueOf)) (fn [id foos bars bazs] ...))
前三个参数是要 join 的数据集,每个都会指定一个函数来从数据源中选出 key。最后的一个参数是一个函数,用来把分组结果结合起来。在咱们的例子中,这个函数会被调用两次:
[1 ({:id 1, :a "abc"}, {:id 1, :a "def"}) ([1 42]) ({:my_id "1", :c [1 2 3]]})] [2 ({:id 2, :a "abc"}) ([2 37] [2 3.14]) ()]
这把全部 id 为 1 的值和 id 为 2 的值结合在了一块儿。不一样的键值被独立的分配到不一样的机器。默认状况下,key 能够不在数据源中出现,可是有选项能够指定必须出现。
Hadoop 提供了底层的接口作 map-reduce job,但即使如此仍是有限制的,即一次只会运行一轮 map-reduce,没有数据流和复杂查询的概念。Pig 在 Hadoop 上抽象出一层,但到目前为止,它仍旧只是一门脚本语言,你仍是须要用 UDF 来对数据作一些有意思的事情。PigPen 更进一步的作了抽象,把 map-reduce 作成了一门语言。
若是你刚接触 map-reduce,咱们推荐你看下这里。
注意:PigPen 不是 一个 Clojure 对 Pig 脚本的封装,颇有可能产生的脚本是人看不懂的。
PigPen 设计的和 Clojure 尽量保持一致。Map-Reduce 是函数式编程,那为何不利用一门已存在的强大的函数式编程语言呢?这样不光学习曲线低,并且大多数概念也能更容易的应用到大数据上。
在 PigPen 中,查询被当作 expression tree 处理,每一个操做符都被表示须要的行为信息的 map,这些 map 能够嵌套在一块儿组成一个复杂查询的树形表式。每一个命令包含了指向祖命令的引用。在执行的时候,查询树会被转化成一个有向无环的查询图。这能够很容易的合并重复的命令,优化相关命令的顺序,而且能够利用 debug 信息调试查询。
去重 当咱们把查询表示成操做图的时候,去重是一件很麻烦的事。Clojure 提供了值相等的操做,即若是连个对象的内容相同,它们就相等。若是两个操做有相同的表示,那它们彻底相同,因此在写查询的时候不用担忧重复的命令,它们在执行以前都会被优化。
举个例子,假设咱们有这样两个查询:
(let [even-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter even?) (pig/store-clj "even-squares.clj")) odd-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter odd?) (pig/store-clj "odd-squares.clj"))] (pig/script even-squares odd-squares))
在这个查询中,咱们从一个文件加载数据,计算每一个数的平方,而后分红偶数和奇数,操做图看起来是这样: 在此输入图片描述
这符合咱们的查询,可是作了不少额外的工做。咱们加载了 input.clj
两次,全部数的平方也都计算了两次。这看上去可能没有不少工做,可是当你对不少数据作这样的事情,简单的操做累加起来就不少。为了优化这个查询,咱们能够找出相同的操做。看第一眼发现咱们计算平方的操做多是一个候选,可是他们有不一样的父节点,所以不能把他们合并在一块儿。可是咱们能够把加载函数合并,由于他们没有父节点,并且他们加载相同的文件。
如今咱们的图看起来是这样:
如今咱们值加载一次数据,这会省一些时间,但仍是要计算两次平方。由于咱们如今只有一个加载的命令,咱们的 map 操做如今相同,能够合并:
这样咱们就获得了一个优化过的查询,每一个操做都是惟一的。由于咱们每次只会合并一个命令,咱们不会修改查询的逻辑。你能够很容易的生成查询,而不用担忧重复的执行,PigPen 对重复的部分只会执行一次。
序列化 当咱们用 Clojure 处理完数据之后,数据必须序列化成二进制字节,Pig 才能在集群的机器间传数据。这对 PigPen 是一个很昂贵可是必须的过程。幸运的是一个脚本中常常有不少连续的操做能够合成一个操做,这对于没必要要的序列化和反序列化节省了不少时间。例如,任意连续的 map,filter 和 mapcat 操做均可以被重写成一个单独的 mapcat 操做。
咱们经过一些例子来讲明:
在这个例子中,咱们从一个序列化的值(蓝色)4开始,对它反序列化(橙色),执行咱们的 map 函数,而后再把它序列化。
如今咱们来试一个稍微复杂一点的(更现实的)例子。在这个例子中,咱们执行一个 map,一个 mapcat 和一个 filter 函数。
若是你之前没用过 mapcat,我能够告诉你这是对一个值运行一个函数而后返回一串值的操做。那个序列会被 flatten,每一个值都会传给下一步使用。在 Clojure 里,那是 map 和 concat 联合以后的结果,在 Scala 里,这叫作 flatMap,而在 C# 里叫 selectMany。
在下图中,左边的流程是咱们优化以前的查询,右边的是优化以后的。和第一个例子同样,咱们一样从 4 开始,计算平方,而后对这个值作减一的操做,返回自己和加一的操做。Pig 会获得这个值的集合而后作 flatten,使每一个值都成为下一步的输入。注意在和 Pig 交互的时候咱们要序列化和反序列化。第三步,也就是最后一步对数据进行过滤,在这个例子中咱们只保留奇数值。如图所示,咱们在任意两步之间都序列化和反序列化数据。
右边的图显示了优化后的结果。每一个操做都返回了一个元素序列。map 操做返回一个只有单元素 16 的序列,mapcat 也同样,过滤操做返回 0 元素或单元素的序列。经过是这些命令保持一致,咱们能够很容易的把他们合并到一块儿。咱们在一套命令中flattrn 了更多的值序列,可是在步骤之间没有序列化的消耗。虽然卡起来更复杂,可是这个优化是每一个步骤都执行的更快了。
交互式开发,测试,以及可调试性是 PigPen 的关键功能。若是你有一个一次运行好几天的 job,那你最不想看到的是跑了十一个小时后冒出来一个 bug。PigPen 有个基于 rx 的本地运行模式。这可让咱们对查询写单元测试。这样咱们能够更有把握的知道运行的时候不会挂掉,而且能返回期待的值。更牛逼的是这个功能可让咱们进行交互式的开发。
一般状况下,咱们刚开始会从数据源中选一些记录来作单元测试。由于 PigPen 在 REPL 中返回数据,咱们不须要额外构造测试数据。这样,经过 REPL,咱们能够根据须要对 mock 数据作 map,filter,join 和 reduce 的操做。每一个步骤均可以验证结果是否是咱们想要的。这种方法相对于写一长串脚本而后凭空想象能产生更可靠的数据。还有一个有用的地方是能够把复杂的查询写成几个较小的函数单元。Map-reduce 查询随着数据源的量级可能产生剧烈的增长或减小。当你把脚本做为一个总体测试的时候,你可能要读一大堆数据,最后产生一小撮数据。经过把查询细化成较小的单元,你能够对读 100 行,产生 2 行这样子来测试一个单元,而后测试第二个单元的时候能够用这两行做为模板来产生 100 多个数据。
调试模式对于解决异常颇有用,启用后会在正常输出的同时,把脚本中每一个操做的结果写到磁盘上。这对于像 Hadoop 这样的环境颇有用,在这种状况下,你无法单步跟踪代码,并且每一个步骤均可能花好几个小时。调试模式还能够可视化流程图。这样能够可视化的把执行计划的和实际操做的输出关联起来。
要启用调试模式,请参考 pig/write-script 和 pig/generate-script 的选项,这会在指定的目录下写额外的调试输出。
启用调试模式的例子:
(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)
要启用可视化模式,能够看看 pig/show 和 pig/dump&show。
可视化的例子:
(pig/show my-pigpen-query) ;; Shows a graph of the query (pig/dump&show my-pigpen-query) ;; Shows a graph and runs it locally
PigPen 有个好用的功能是能够很容易的建立本身的操做符。例如,咱们能够定义像求差集和交集这样的集合和多集合的操做符,这些只是像 co-group
这样的操做符的变体,可是若是能定义,测试它们,而后不再去想这些逻辑怎么实现的,那就更好了。
这对更复杂的操做也是颇有用的。对于集合数据咱们有 sum
,avg
,min
,max
,sd
和 quantiles
这些可重用的统计操做符,还有 pivot
这样的操做符能够把多维数据分组而后对每组计数。
这些操做自己都是简单的操做,可是当你把它们从你的查询中抽象出来以后,你的查询也会变的简单不少。这时候你能够花更多的时间去想怎么解决问题,而不是每次都重复写基本的统计方法。
咱们选择 Pig 是由于咱们不想把 Pig 已有的优化的逻辑重写一遍,不考虑语言层面的东西的话,Pig 在移动大数据方面作得很好。咱们的策略是利用 Pig 的 DataByteArray 二进制格式来移动序列化的 Clojure 数据。在大多数状况下,Pig 不须要知道数据的底层展示形式。Byte array 能够很快的作比较,这样对于 join 和 group 操做,Pig 只须要简单的比较序列化的二进制,若是序列化的输出一致,在 Clojure 中值就相等。不过这对于数据排序不适用。二进制的排序其实没什么用,并且和原始数据的排序结果也不同。要想排序,还得把数据转化回去,并且只能对简单类型排序。这也是 Pig 强加给 PigPen 的为数很少的一个缺陷。
咱们在决定作 PigPen 以前也评估过其余语言。第一个要求就是那必须是一门编程语言,并非一种脚本语言加上一堆 UDF。咱们简单看过 Scalding,它看上去颇有前途,可是咱们的团队主要是用的 Clojure。 能够这么说,PigPen 对于 Clojure 就像是 Scalding 对于 Scala。Cascalog 是用 Clojure 写 map-reduce 一般会用的语言,可是从过去的经验来看,Cascalog 对于平常工做其实没什么用,你须要学一套复杂的新语法和不少概念,经过变量名对齐来作隐式 join 也不是理想的方案,若是把操做符顺序弄错了会形成很大的性能问题,Cascalog 会 flatten 数据结果(这可能很浪费),并且组合查询让人感受很别扭。
咱们也考虑过对 PigPen 用一门宿主语言。这样也能在 Hive 之上构建相似的抽象,可是对每一个中间产物都定义 schema 跟 Clojure 的理念不符。并且 Hive 相似与 SQL,使得从功能性语言翻译更难。像 SQL 和 Hive 这样的关系模型语言与像 Clojure 和 Pig 这样的功能性语言之间有着巨大的差。最后,最直接的解决办法就是在 Pig 之上作一层抽象。