策划 & 审校 | Natalie做者 | 张海涛编辑 | Linda AI 前线导读: 本文是 Apache Beam 实战指南系列文章第五篇内容,将对 Beam 框架中的 pipeline 管道进行剖析,并结合应用示例介绍如何设计和应用 Beam 管道。系列文章第一篇回顾 Apache Beam 实战指南 | 基础入门、第二篇回顾 Apache Beam 实战指南 | 玩转 KafkaIO 与 Flink、第三篇回顾 Apache Beam 实战指南 | 玩转大数据存储 HdfsIO、第四篇回顾 A pache Beam 实战指南 | 如何结合 ClickHouse 打造“AI 微服务”?
一.概述关于 Apache Beam 实战指南系列文章前端
随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发者常常要用到不一样的技术、框架、API、开发语言和 SDK 来应对复杂应用的开发,这大大增长了选择合适工具和框架的难度,开发者想要将全部的大数据组件熟练运用几乎是一项不可能完成的任务。java
面对这种状况,Google 在 2016 年 2 月宣布将大数据流水线产品(Google DataFlow)贡献给 Apache 基金会孵化,2017 年 1 月 Apache 对外宣布开源 Apache Beam,2017 年 5 月迎来了它的第一个稳定版本 2.0.0。在国内,大部分开发者对于 Beam 还缺少了解,社区中文资料也比较少。InfoQ 指望经过 Apache Beam 实战指南系列文章 推进 Apache Beam 在国内的普及。linux
其余行业问我们 IT 具体干什么的,不少 IT 人员会自嘲本身就是“搬砖”(此处将复制代码称为搬砖)的民工。过了两天 GitHub 出现自动写代码的人工智能,IT 程序员深深叹了一口气说道“完了要失业了,代码没得搬了”。其实从入行 IT 那一刻起,无论咱们作前端、服务端、底层架构等任何岗位,其实咱们都是为数据服务的服务人员(注:不是说从民工转岗到服务员了):把数据从后端搬到前端,把前端数据再写入数据库。尽管编程语言从 C、C++、C#、JAVA、Python 不停变化,为了适应时代背景框架也是变幻无穷,咱们拼命从“亚马逊热带雨林”一直学到“地中海”。程序员
而后 Apache Beam 这个一统“地中海”的框架出现了。Apache Beam 不光统一了数据源,还统一了流批计算。在这个数据传输过程当中有一条核心的技术就是管道(Pipeline),不论是 Strom,Flink ,Beam 它都是核心。在这条管道中能够对数据进行过滤、净化、清洗、合并、分流以及各类实时计算操做。数据库
本文会详细介绍如何设计 Apache Beam 管道、管道设计工具介绍、源码和案例分析,普及和提高你们对 Apache Beam 管道的认知。编程
二.怎样设计好本身的管道?设计管道注意事项图 2-1 简单管道后端
1. 你输入的数据存储在那里?首先要肯定你要构造几条数据源,在 Beam 能够构建多条,构建以前能够选择本身的 SDK 的 IO。微信
2. 你的数据类型是什么样的?Beam 提供的是键值对的数据类型,你的数据多是日志文本、格式化设备事件、数据库的行,因此在 PCollection 就应该肯定数据集的类型。网络
3. 你想怎么处理数据?对数据进行转换、过滤处理、窗口计算、SQL 处理等。在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操做。架构
4. 你打算把数据最后输出到哪里去?在管道末尾进行 Write 写入操做,把数据最后写入你本身想存放或最后流向的地方。
管道的几种玩法1. 分支管道:屡次转换,处理相同的数据集图 2-2-1 屡次转换处理相同数据示意图
描述:例如上图 2-1-1 图所示,从一个数据库的表读取或转换数据集,而后从数据集中分别找找以字母“A”开头的数据放入一个分支数据集中,若是以字母“B”开头的数据放入另外一个分支数据集中,最终两个数据集进行隔离处理。
数据集:
// 为了演示显示内存数据集
final List<String> LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");
示例代码:
PCollection<String> dbRowCollection = ...;// 这个地方能够读取任何数据源。
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以"A"开头的数据
c.output(c.element());
System.out.append("A 开头的单词有:"+c.element()+"\r");
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以"A"开头的数据
c.output(c.element());
System.out.append("B 开头的单词有:"+c.element()+"\r");
}
}
}));
最终结果展现:
A 开头的单词有:Aggressive
B 开头的单词有:Bold
A 开头的单词有:Apprehensive
B 开头的单词有:Brilliant
原示例代码地址 :pipelineTest2_1
2. 分支管道:一次转换,输出多个数据集图 2-2-2 一次转换多个输出示意图
描述:根据图 2-2-1 和图 2-2-2 图中能够看出,他们以不一样的方式执行着相同的操做,图 2-2-1 中的管道包含两个转换,用于处理同一输入中的元素 PCollection。一个转换使用如下逻辑:
if(以'A'开头){outputToPCollectionA}
另外一个转换为
if(以'B'开头){outputToPCollectionB}
由于每一个转换读取整个输入 PCollection,因此输入中的每一个元素都会 PCollection 被处理两次。图 2-2-2 中的管道以不一样的方式执行相同的操做 - 只有一个转换使用如下逻辑:
if(以'A'开头){outputToPCollectionA} else if(以'B'开头){outputToPCollectionB}
其中输入中的每一个元素都 PCollection 被处理一次。
数据集:同 2-1-1 数据集
示例代码:
// 定义两个 TupleTag,每一个输出一个。
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母带有"A"的数据集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母带有"B"的数据集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
若是每一个元素的转换计算很是耗时,则使用其余输出会更有意义,由于一次性过滤所有数据,比所有数据过滤两次从性能上和转换上都存在必定程度上提高,数据量越大越明显。
最终结果展现:
A 开头的单词有:Apprehensive
A 开头的单词有:Aggressive
B 开头的单词有:Brilliant
B 开头的单词有:Bold
原示例代码地址 :pipelineTest2_2
3. 合并管道:多个数据集,合并成一个管道输出图 2-2-3 多数据集合并输出图
描述:
上图 2-2-3 是接图 2-2-1 的继续,把带“A” 的数据和带“B” 字母开头的数据进行合并到一个管道。这个地方注意点是 Flatten 用法必须两个数据的数据类型相同。
数据集:
// 为了演示显示内存数据集
final List<String> LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List<String> LINESb = Arrays.asList(
"Bold",
"Brilliant");
示例代码:
// 将两个 PCollections 与 Flatten 合并
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// 继续合并新的 PCollection
mergedCollectionWithFlatten.apply(...);
结果展现:
合并单词单词有:
Aggressive
Brilliant
Apprehensive
Bold
原示例代码地址 :pipelineTest2_3
4. 合并管道:多个数据源,连接合并一个管道输出图 2-2-4 多数据源合并输出图
描述:
你的管道能够从一个或多个源读取或输入。若是你的管道从多个源读取而且这些源中的数据相关联,则将输入链接在一块儿会颇有用。在上面的图 2-2-4 所示的示例中,管道从数据库表中读取名称和地址,并从 Kafka 主题中读取名称和订单号。而后管道 CoGroupByKey 用于链接此信息,其中键是名称 ; 结果 PCollection 包含名称,地址和订单的全部组合。
示例代码:
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);管道的设计工具
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// 将集合值合并到 CoGbkResult 集合中。
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
对于管道的设计不光用代码去实现,也能够用视图工具。如今存在的有两种一种是拓蓝公司出品叫 Talend Big Data Studio,另外一种就是免费开源的视图设计工具 kettle-beam。
三.怎样建立你的管道
Apache Beam 程序从头至尾就是处理数据的管道。本小节使用 Apache Beam SDK 中的类构建管道,一个完整的 Apache Beam 管道构建流程以下:
首先建立一个 Pipeline 对象。
不论是数据作任何操做,如“ 读取”或“ 建立”及转换都要为管道建立 PCollection 一个或多个的数 据集(PCollection<String>)。
在 Apache Beam 的管道中你能够对数据集 PCollection 作任何操做,例如转换数据格式,过滤,分组,分析或以其余方式处理数据中的每个元素。每一个转换都会建立一个新输出数据集 PCollection,固然你能够在处理完成以前进行作任何的转换处理。
把你认为最终处理完成的数据集写或以其余方式输出最终的存储地方。
最后运行管道。
每个 Apache Beam 程序都会从建立管道(Pipeline)对象开始。
在 Apache Beam SDK,每个管道都是一个独立的实体,管道的数据集也都封装着它的数据和对应的数据类型(在 Apache Beam 中有对应的数据转换 类型包)。最后把数据进行用于各类转换操做。
在建立的管道的时候须要设置管道选项 PipelineOptions,有两种建立方式第一种是无参数和一种有参数的。具体两种有什么不一样呢?无参数的能够在程序中指定相应的管道选项参数,如显示设置执行大数据引擎参数。有参数的就能够在提交 Apache Beam jar 程序的时候进行用 Shell 脚本的方式后期设置管道对应的参数。
具体示例以下:
无参数
// 首先定义管道的选项
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 显示设置执行大数据引擎
// 建立管道实体对象
Pipeline p = Pipeline.create(options);
有参数
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
提交设置参数的格式以下:
--<option>=<value>将数据读入你的管道
建立 PCollection 的初始值,请读取外部的数据源及指定的本地数据。例如读取数据库,文本文件,流数据等等,如今 Apache Beam java SDKS 支持 33 种数据源,正在接入集成的有 7 种,Python 13 种,正在集成的 1 种。基本覆盖了 IT 行业的一切数据源。例如读取文本数据咱们能够用 TextIO.Read 的方法进行读取数据。转换应用于管道对象 p 中。而且返回对应格式的数据集 PCollection:
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from("/home/inputData.txt"));
注意: 在 Apache Beam 程序执行中,Beam 程序 2.2.0 之前版本不支持 Windows 如:D\inputData.txt 路径格式。只支持 linux 路径格式,及其余如 HDFS 等存储路径。
已经支持的数据源统计以下表:
将管道数据转换为处理的格式
不少时候直接从数据源读取的数据不会直接流入目标存储。大部分须要进行数据格式的转换,数据的清洗,数据的过滤,数据的关联处理,数据累加操做等。这里须要对源数据进行处理,处理完成的数据处理流入目标存储外还能够进行看成参数同样,传递并继续应用到管道中。
如下示例代码为 把一串数据经过转换操做赋值给 words , 而后再把 words 再次传递到下一个操做应用,再进一步进行操做的处理工做。
PCollection<String> words = ...;编写或输出管道最终输出数据的地方
PCollection<String> reversedWords = words.apply(new ReverseWords());
通过一些列的清洗、过滤、关联、转换处理工做后的数据,最终都会经过 SDKIO 进行写入管道外的存储或着数据库表。然而这种写入操做大部分都是在管道的末尾端进行操做的。
以下面代码示例,就是把管道的数据经过 Apache Beam 中的 TextIO.Write 写入 Linux 的文本文件 test.txt 中。
PCollection<String> filteredWords = ...;运行你的管道
filteredWords.apply("WriteMyFile", TextIO.write().to("/home/test.txt"));
构建管道后,使用 run 方法最后执行管道。管道以异步方式执行的。写完这一句代码后你就能够把本身的程序用 Jenkins 进行编译并提交给运行管道平台,最终有管道执行平台来运行。
运行代码示例:
p.run();
处理异步执行的方式,还有同步执行方式,是在 run 方法后面加个看守方法 waitUntilFinish。具体代码以下:
p.run().waitUntilFinish();四.怎样测试你的管道
Apache Beam 管道开发中最后的测试在整个开发中也是很是重要的一个环节。Apache Beam 的代码程序没必要要每次都进行远程构建执行到 Flink 集群上,由于管道代码的错误及 Bug 的修改在本地能更好的调试,然而每次构建到远程上面去执行是很是麻烦的事情。Apache Beam 提供 DirectRunner ,一个用于本地测试的执行引擎。
使用 DirectRunner 测试管道的时候,你能够用小规模的数据进行测试。此外你若是开发机器上装了本地的 flink ,也能够指定本地的 Flink 执行。例如测试一个简单的转换函数 DoFn,符合变换,数据源输入到管道尾端数据输出等操做。
注意点:DirectRunner 是用于管道或 Apache Beam 程序 本地开发调试测试的 数据执行引擎,不能够用于真正生产环境中运行。不然程序执行性能会大大下降,这里有坑要避开。
测试单个 Pipeline 步骤咱们开发完成管道 Beam 程序后须要本地测试,Beam SDK for Java 提供了一种方便的方法来测试 TestPipeline 的封装类。 在 Beam SDK testing 包中。
它的使用操做方法:
建立一个 TestPipeline。
建立一些已知的静态测试数据,也称为内存数据,真正应用基本是流或批数据。
使用 Create 方法建立 PCollection 输入数据。
使用 Apply 方法进行数据的转换处理而且返回指定的 PCollection。
最后使用 PAssert 去验证输出的结果是否为预期结果值。
Apache Beam 中简单的管道单元测试实例。
public class CountTest {端到端的测试管道
// 建立静态的内存数据
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// 建立一个测试管道.
Pipeline p = TestPipeline.create();
// 建立一个输入数据集.
PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
// 添加转换统计单词个数.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// 验证结果.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// 运行整个管道.
p.run();
}
端到端的测试,主要针对输入端和输出端两端的测试。要测试整个管道,请执行如下操做:
建立一个 Beam 测试 SDK 中所提供的 TestPipeline 实例。
对于多步骤数据流水线中的每一个输入数据源,建立相对应的静态(Static)测试数据集。
使用 Create Transform,将全部的这些静态测试数据集转换成 PCollection 做为输入数据集。
按照真实数据流水线逻辑,调用全部的 Transforms 操做。
在数据流水线中全部应用到 Write Transform 的地方,都使用 PAssert 来替换这个 Write Transform,而且验证输出的结果是否咱们指望的结果相匹配
因为端到端测试跟单个 Pipeline 步骤类似就不在举示例代码。其实开发过程当中本地调试打断点,写日志测试也是更快解决问题的一个办法。
五. Apache Beam 的管道源码解析Apache Beam Pipeline 源码解析管道源代码主类是比较简单的,本文针对 Pipeline.java 进行解析。
1. 定义管道参数及管道建立在管道建立首先能够定义管道的选项,例如 Beam 做业程序的名称、惟一标识、运行引擎平台等,固然也能够提交引擎平台用命令指定也能够。而后实例化一个管道对象。
源码示例以下:
PipelineOptions options = PipelineOptionsFactory.create();2. 读取数据源
Pipeline p = Pipeline.create(options);
读取要处理的数据,有文本数据,结构化数据和非结构化数据以及流数据。做为数据处理的源数据。
源码示例以下:
PCollection<String> lines =3. 进行数据处理操做
p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
在管道里面能够进行窗口操做、函数操做、原子操做以及 SQL 操做。
数据统计的源码示例:
PCollection<KV<String, Integer>> wordCounts =allLines4. 输出结果及运行
.apply(ParDo.of(new ExtractWords()))
.apply(new Count<String>());
源代码示例:
PCollection<String> formattedWordCounts =六.管道实战案例案例场景描述
wordCounts.apply(ParDo.of(new FormatCounts()));
formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
p.run();
随着人工智能 的不断发展,AI Cloud 在银行加快落地,安防 AI 碎片化的应用场景遍地开花。本文结合银行营业网点的业务,介绍管道案例实战。
以银行的员工脱离岗检测中的行为分析数据预处理为例。咱们去银行办理业务过程当中,首先要取号,而后叫号。叫号提示会对接系统造成一条消息回传后台,可是有时候正常办理业务期间有柜台营业员出去,而后好久才回来。这个时候摄像头会根据柜台离岗时间自动 AI 行为分析生成报警处理。
案例业务架构流程
叫号报警和行为分析报警产生的数据经过营业网点进行上报。
上传网关集群,网关集群进行转换消息格式压缩消息。
消息流入消息中心等待消费,消息中心再次起着消峰做用。
用 Beam 管道的时间窗口特性、流合并处理特性进行消息消费处理
消息进入大数据实时分析处理平台处理应用消息。
// 建立管道工厂2. 测试运行结果
PipelineOptions options = PipelineOptionsFactory.create();
// 显式指定 PipelineRunner:FlinkRunner 必须指定若是不制定则为本地
options.setRunner(DirectRunner.class); // 生产环境关闭
// options.setRunner(FlinkRunner.class); // 生成环境打开
Pipeline pipeline = Pipeline.create(options);// 设置相关管道
// 为了演示显示内存数据集
// 叫号数据
final List<KV<String, String>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101 号顾客请到 3 号柜台"), KV.of("DS-2CD2T26FDWDA3-IS", "102 号顾客请到 1 号柜台"),
KV.of("DS-2CD6984F-IHS", "103 号顾客请到 4 号柜台"),
KV.of("DS-2CD7627HWD-LZS", "104 号顾客请到 2 号柜台"));
//AI 行为分析消息
final List<KV<String, String>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I",
"CMOS 智能半球网络摄像机, 山东省济南市解放路支行 3 号柜,type=2,display_image=no"),
KV.of("DS-2CD2T26FDWDA3-IS", "CMOS 智能筒型网络摄像机, 山东省济南市甸柳庄支行 1 号柜台,type=2,display_image=no"),
KV.of("DS-2CD6984F-IHS", "星光级全景拼接网络摄像机, 山东省济南市市中区支行 4 号柜台,type=2,display_image=no"),
KV.of("DS-2CD7627HWD-LZS", "全结构化摄像机, 山东省济南市市中区支行 2 号柜台,type=2,display_image=no"));
PCollection<KV<String, String>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist));
PCollection<KV<String, String>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist));
final TupleTag<String> noticeTag = new TupleTag<>();
final TupleTag<String> messageTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create());
System.out.append("合并分组后的结果:\r");
PCollection<String> contactLines = results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String name = e.getKey();
Iterable<String> emailsIter = e.getValue().getAll(noticeTag);
Iterable<String> phonesIter = e.getValue().getAll(messageTag);
System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r");
}
}));
pipeline.run().waitUntilFinish();
源码地址:pipelineTest2_5.java
七.小结近几年随着 AloT 发展得如火如荼,其落地场景也遍地开花。loT 做为 AI 落地先锋,已经步入线下各行各业。本文以 Beam 管道的设计切入,重点对 Beam 管道设计工具和源码进行解析,最后结合银行金融行业对 AI 碎片化的场景进行数据预处理的案例,帮助你们全面了解 Beam 管道。
做者介绍张海涛,目前就任于海康威视云基础平台,负责海康威视在全国金融行业 AI 大数据落地的基础架构设计和中间件的开发,专一 AI 大数据方向。Apache Beam 中文社区发起人之一,若是想进一步了解最新 Apache Beam 和 ClickHouse 动态和技术研究成果,请加微信 cyrjkj 入群共同研究和运用。