在大数据的浪潮之下,技术的更新迭代十分频繁。受技术开源的影响,大数据开发者提供了十分丰富的工具。但也由于如此,增长了开发者选择合适工具的难度。在大数据处理一些问题的时候,每每使用的技术是多样化的。这彻底取决于业务需求,好比进行批处理的MapReduce,实时流处理的Flink,以及SQL交互的Spark SQL等等。而把这些开源框架,工具,类库,平台整合到一块儿,所须要的工做量以及复杂度,可想而知。这也是大数据开发者比较头疼的问题。而今天要分享的就是整合这些资源的一个解决方案,它就是 Apache Beam。java
Apache Beam 最初叫 Apache Dataflow,由谷歌和其合做伙伴向Apache捐赠了大量的核心代码,并创立孵化了该项目。该项目的大部分大码来自于 Cloud Dataflow SDK,其特色有如下几点:git
那 Apache Beam到底能解决哪些问题,它的应用场景是什么,下面咱们能够经过一张图来讲明,以下图所示:github
经过改图,咱们能够很清晰的看到整个技术的发展流向;一部分是谷歌派系,另外一部分则是Apache派系。在开发大数据应用时,咱们有时候使用谷歌的框架,API,类库,平台等,而有时候咱们则使用Apache的,好比:HBase,Flink,Spark等。而咱们要整合这些资源则是一个比较头疼的问题,Apache Beam 的问世,整合这些资源提供了很方便的解决方案。apache
下面,咱们经过一张流程图来看Beam的运行流程,以下图所示:编程
经过上图,咱们能够清楚的知道,执行一个流程分如下步骤:api
Beam SDK 提供了一个统一的编程模型,来处理任意规模的数据集,其中包括有限的数据集,无限的流数据。Apache Beam SDK 使用相同的类来表达有限和无限的数据,一样使用相同的转换方法对数据进行操做。Beam 提供了多种 SDK,你能够选择一种你熟悉的来创建数据处理管道,如上述的 2.1 中的图,咱们能够知道,目前 Beam 支持 Java,Python 以及其余待开发的语言。bash
在 Beam 管道上运行引擎会根据你选择的分布式处理引擎,其中兼容的 API 转换你的 Beam 程序应用,让你的 Beam 应用程序能够有效的运行在指定的分布式处理引擎上。于是,当运行 Beam 程序的时候,你能够按照本身的需求选择一种分布式处理引擎。当前 Beam 支持的管道运行引擎有如下几种:框架
本示例经过使用 Java SDK 来完成,你能够尝试运行在不一样的执行引擎上。maven
关于上述的安装步骤,并非本篇博客的重点,这里笔者就很少赘述了,不明白的能够到官网翻阅文档进行安装。编程语言
Apache Beam 的源代码在 Github 有托管,能够到 Github 下载对应的源码,下载地址:https://github.com/apache/beam
而后,将其中的示例代码进行打包,命令以下所示:
$ mvn archetype:generate \ -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=LATEST \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
此时,命令会建立一个文件夹 word-count-beam,里面包含一个 pom.xml 和相关的代码文件。命令以下所示:
$ cd word-count-beam/ $ ls pom.xml src $ ls src/main/java/org/apache/beam/examples/ DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
一个 Beam 程序能够运行在多个 Beam 的可执行引擎上,包括 ApexRunner,FlinkRunner,SparkRunner 或者 DataflowRunner。另外还有 DirectRunner。不须要特殊的配置就能够在本地执行,方便测试使用。
下面,你能够按需选择你想执行程序的引擎:
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
而后,你能够经过访问 http://<flink master>:8081 来监测运行的应用程序。
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \ --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \ -Pdataflow-runner
当程序运行完成后,你能够看到有多个文件以 count 开头,个数取决于执行引擎的类型。当你查看文件的内容的时候,每一个惟一的单词后面会显示其出现次数,可是先后顺序是不固定的,也是分布式引擎为了提升效率的一种经常使用方式。
$ ls counts* $ more counts* api: 9 bundled: 1 old: 4 Apache: 2 The: 1 limitations: 1 Foundation: 1 ...
$ cat counts* BEAM: 1 have: 1 simple: 1 skip: 4 PAssert: 1 ...
$ ls counts* $ more counts* The: 1 api: 9 old: 4 Apache: 2 limitations: 1 bundled: 1 Foundation: 1 ...
$ ls /tmp/counts* $ more /tmp/counts* The: 1 api: 9 old: 4 Apache: 2 limitations: 1 bundled: 1 Foundation: 1 ...
$ ls counts* $ more counts* beam: 27 SF: 1 fat: 1 job: 1 limitations: 1 require: 1 of: 11 profile: 10 ...
$ gsutil ls gs://<your-gcs-bucket>/counts* $ gsutil cat gs://<your-gcs-bucket>/counts* feature: 15 smother'st: 1 revelry: 1 bashfulness: 1 Bashful: 1 Below: 2 deserves: 32 barrenly: 1 ...
Apache Beam 主要针对理想并行的数据处理任务,并经过把数据集拆分多个子数据集,让每一个子数据集可以被单独处理,从而实现总体数据集的并行化处理。固然,也能够用 Beam 来处理抽取,转换和加载任务和数据集成任务(一个ETL过程)。进一步将数据从不一样的存储介质中或者数据源中读取,转换数据格式,最后加载到新的系统中。