Apache Crunch:用于简化MapReduce编程的Java库

Apache Crunch(孵化器项目)是基于Google的FlumeJava库编写的Java库,用于建立MapReduce流水线。与其余用来建立MapReduce做业的高层工具(如Apache Hive、Apache Pig和Cascading等)相似,Crunch提供了用于实现如链接数据、执行聚合和排序记录等常见任务的模式库。而与其余工具不一样的是,Crunch并不强制全部输入遵循同一数据类型。相反,Crunch使用了一种定制的类型系统,很是灵活,可以直接处理复杂数据类型,如时间序列、HDF5文件、Apache HBase表和序列化对象(像protocol buffer或Avro记录)等。 html

Crunch并不想阻止开发者以MapReduce方式思考,而是尝试使之简化。尽管MapReduce有诸多优势,但对不少问题而言,并不是正确的抽象级别:大部分有意思的计算都是由多个MapReduce做业组成的,状况每每是这样——出于性能考虑,咱们须要将逻辑上独立的操做(如数据过滤、数据投影和数据变换)组合为一个物理上的MapReduce做业。 git

本质上,Crunch设计为MapReduce之上的一个薄层,但愿在不牺牲MapReduce力量(或者说不影响开发者使用MapReduce API)的前提下,更容易在正确的抽象级别解决手头问题。 github

尽管Crunch会让人想起历史悠久的Cascading API,可是它们各自的数据模型有很大不一样:按照常识简单总结一下,能够认为把问题看作数据流的人会偏心Crunch和Pig,而考虑SQL风格链接的人会偏心Cascading和Hive。 apache

Crunch的理念 设计模式

PCollection和PTable<K, V>是Crunch的核心抽象,前者表明一个分布式、不可变的对象集合,后者是Pcollection的一个子接口,其中包含了处理键值对的额外方法。这两个核心类支持以下四个基本操做: app

  1. parallelDo:将用户定义函数应用于给定PCollection,返回一个新的PCollection做为结果。
  2. groupByKey:将一个PTable中的元素按照键值排序并分组(等同于MapReduce做业中的shuffle阶段)
  3. combineValues:执行一个关联操做来聚合来自groupByKey操做的值。
  4. union:将两个或多个Pcollection看作一个虚拟的PCollection。

Crunch的全部高阶操做(joins、cogroups和set operations等)都是经过这些基本原语实现的。Crunch的做业计划器(job planner)接收流水线开发者定义的操做图,将操做分解为一系列相关的MapReduce做业,而后在Hadoop集群上执行。Crunch也支持内存执行引擎,可用于本地数据上流水线的测试与调试。 框架

有些问题能够从可以操做定制数据类型的大量用户定义函数受益,而Crunch就是为这种问题设计的。Crunch中的用户定义函数设计为轻量级的,为知足应用程序的须要,仍然提供了完整的访问底层MapReduce API的功能。Crunch开发者也可使用Crunch原语来定义API,为客户提供涉及一系列复杂MapReduce做业的高级ETL、机器学习和科学计算功能。 机器学习

Crunch起步 maven

能够从Crunch的网站下载最新版本的源代码或二进制文件,或者使用在Maven Central发布的dependencies分布式

源代码中有不少示例应用。下面是Crunch中WordCount应用的源代码:

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
	for (String word : line.split("\\s+")) {
	  emitter.emit(word);
	}
      }
    }, Writables.strings()); // Indicates the serialization format

    // The count method applies a series of Crunch primitives and returns
    // a map of the top 20 unique words in the input PCollection to their counts.
    // We then read the results of the MapReduce jobs that performed the
    // computations into the client and write them to stdout.
     for (Pair<String, Long> wordCount : words.count().top(20).materialize()) {
      System.out.println(wordCount);
     }
   }
}

Crunch优化方案

Crunch优化器的目标是尽量减小运行的MapReduce做业数。大多数MapReduce做业都是 IO密集型的,所以访问数据的次数越少越好。公平地说,每种优化器(Hive、Pig、Cascading和Crunch)的工做方式本质上是相同的。但与其余框架不一样的是,Crunch把优化器原语暴露给了客户开发人员,对于像构造ETL流水线或构建并评估一组随机森林模型这样的任务而言,构造可复用的高阶操做更容易。

结论

Crunch目前仍处于Apache的孵化器阶段,咱们很是欢迎社区贡献(参见项目主页)让这个库更好。特别的是,咱们正在寻求更高效的MapReduce编译思想(包括基于成本考虑的优化)、新的MapReduce设计模式,还但愿支持更多的数据源和目标,如HCatalog、Solr和ElasticSearch等。还有不少把Crunch带向如ScalaClojure等其余JVM语言的项目,也有不少使用Crunch以R语言来建立MapReduce流水线的工具。

相关文章
相关标签/搜索