使用Beam,首先使用Beam SDK中编写一个Beam程序。 在Beam程序中定义了Pipeline,包括全部输入,变换和输出; 同时也包含了设置Pipeline的参数(一般使用命令行选项传递)。 包括Pipeline的执行引擎选项,用来肯定Pipeline运行在那个执行引擎上(目前支持Beam的执行引擎包括Spark Flink Apex等)。html
Beam SDK提供了一些抽象,能够简化大规模分布式数据处理的机制。 Beam用相同的抽象来统一表达批处理和流计算。 当建立Beam Pipeline时,能够根据这些抽象设计数据处理任务。 包括如下:java
Pipeline从头至尾封装整个数据处理任务。包括读取输入数据,变换数据和写入输出数据。全部Beam程序必须建立一个Pipeline。建立Piepline时,还必须指定执行选项,告诉Pipeline在哪里(如哪一种执行引擎,Spark Flink等)和如何运行(批处理或流式)。python
PCollection表示Beam Pipeline处理的的分布式数据集。数据集能够是有限的,例如来自于文件这样的再也不变化的数据源,或是无限的,这意味着它来自于经过订阅或其余机制不断更新的数据源。Pipeline一般经过从外部数据源读取数据来建立初始PCollection,也能够从Beam程序中的内存数据建立PCollection。PCollection是Pipeline中每一个步骤的输入和输出。算法
Transform表明Pipeline中的数据处理操做或步骤。每一个Transform将一个或多个PCollection对象做为输入,执行对该PCollection的元素提供的处理函数,并生成一个或多个输出PCollection对象。数据库
Beam提供Source和Sink API来分别表示读取和写入数据。 Source封装了从一些外部来源(如云端文件存储或订阅流式数据源)将数据读入Beam Pipeline所需的代码。 Sink一样封装将PCollection的元素写入外部数据接收器所需的代码。apache
典型的Beam程序的以下:编程
1. 建立Pipeline对象并设置Pipeline执行选项,包括Pipeline的执行引擎。 2. 为Pipeline建立初始数据集PCollection,使用Source API从外部源读取数据,或使用CreateTransform从内存数据构建PCollection。 3. 应用Transform到每一个PCollection。Transform能够改变、过滤、分组、分析或以其余方式处理PCollection中的元素。Transform建立一个新的输出PCollection,而不改变输入集合(函数式编程特性)。 典型的Pipeline依次将后续的Transform应用于每一个新的输出PCollection,直处处理完成。 4.输出最终的转换PCollection,通常使用Sink API将数据写入外部源。 5. 使用指定的执行引擎运行Pipeline代码。
Pipeline封装了数据处理任务中的全部数据和步骤。 Beam程序一般从构建一个Pipeline对象开始,而后使用该对象做为建立管道数据集做为PCollections的基础,并将其做为Transforms操做。缓存
要使用Beam,咱们编写的程序必须首先建立Beam SDK类Pipeline的实例(一般在main()函数中)。 建立Pipeline时,还须要设置一些配置选项。 能够以编程方式设置管道的配置选项,但提早设置选项(或从命令行读取)一般更容易,并在建立对象时将其传递给Pipeline对象。安全
// 从建立Pipeline的options开始 PipelineOptions options = PipelineOptionsFactory.create(); // 而后建立Pipeline Pipeline p = Pipeline.create(options);
管道抽象封装了数据处理任务中的全部数据和步骤。一般从构建一个PipelinePipeline对象开始,而后使用该对象做为建立管道数据集做为PCollections的基础,并将其做为Transforms操做。服务器
要使用Beam,必须首先建立Beam SDK类Pipeline的实例(一般在main()函数中)。 建立流水线时,您还须要设置一些配置选项。 您能够以编程方式设置管道的配置选项,但提早设置选项(或从命令行读取)一般更容易,并在建立对象时将其传递给管道对象。
虽然能够经过建立PipelineOptions对象并直接设置字段来配置Pipeline,但Beam SDK包含一个命令行解析器,可使用它来使用命令行参数在PipelineOptions中设置字段。
要从命令行读取选项,首先要建立一个PipelineOptions对象,如如下示例代码所示:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
命令行的格式以下:
--<option>=<value>
<font color=red>注意: 使用 .withValidation会校验命令行的参数</font>
使用命令行的方式能够为Pipeline建立任何的参数。
除了标准的PipelineOptions以外,还能够添加自定义选项。 要添加自定义选项,须要为每一个选项定义一个带有getter和setter方法的接口,如如下示例所示:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
还能够为每个参数设定默认值和参数描述,当用户使用–help时显示的描述和默认值。设定方法以下:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
而后使用PipelineOptionsFactory 注册自定义参数的接口,建立PipelineOptions 的时候做为参数传递进去。只有当在PipelineOptionsFactory 中注册了接口以后,使用—help才能显示接口中定义的参数的默认值和描述,PipelineOptionsFactory 才会校验命令行中输入的参数,在全部已注册的自定义参数中是否有匹配的。
下边的代码示例中,展现了如何在PipelineOptionsFactory中注册自定义参数接口和如何使用自定义参数接口:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
如今就能够在Pipeline中使用 –myCustomOption=value 参数了。
PCollection抽象表示分布式数据集。 您能够将PCollection视为Pipeline数据; Bean中的Transform使用PCollection对象做为输入和输出。 所以,若是要处理Pipeline中的数据,则必须采用PCollection的形式。
建立Pipeline后,须要先建立一个至少一个PCollection。 建立的PCollection做为Pipeline中第一个操做的输入。
可使用Beam的Source API从外部源中读取数据来建立PCollection,也能够在程序中建立存储在内存中集合类中的数据的PCollection。 前者一般是在生产环境中Pipeline读取数据;Beam的源API提供了大量针对不一样数据源的适配器从外部数据源读取数据(如大型基于云的文件,数据库或订阅服务)中读取。 后者主要用于测试和调试目的。
要从外部源读取,请使用Beam提供的I / O适配器之一。 适配器的用法有所不一样,但它们的基本逻辑是读取自某些外部数据源, 以PCollection返回从源中读取的数据。
每一个数据源适配器都有一个Read Transform,要读取,必须将该Transform应用于Pipeline。 例如,TextIO.Readio.TextFileSource从外部文本文件读取并返回其元素为String类型的PCollection,每一个String表示文本文件中的一行。
如下是将TextIO.Readio.TextFileSource应用于Pipeline以建立PCollection的方法:
public static void main(String[] args) { // 建立pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // 使用Read Transform建立PCollection 名为'lines' PCollection<String> lines = p.apply( "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); }
参考 I/O部分了解Beam支持的适配器。
从内存中的Java集合建立PCollection,可使用Beam提供的Create Transform。 很像数据适配器的Read,能够在Pipeline中使用Create。
Create接受Java Collection和Coder对象做为参数。 Coder指定如何对集合中的元素进行序列化反序列化。
要从内存中的List建立PCollection,可使用Beam提供的Create Transform。
下边的代码示例中,展现了如何从内存中的List中建立PCollection:
public static void main(String[] args) { // 建立一个Java Collection ,元素类型为String. static final List<String> LINES = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); // 建立pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // 使用Create Transform,用给定的字符串编码器将上边建立的Java Collectio转换为PCollection p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()) }
PCollection由建立它的特定Pipeline对象拥有; Pipeline之间不能共享PCollection。 PCollection看起来很像集合类。 可是,PCollection和集合在几个关键方面有所不一样:
元素类型
PCollection的元素能够是任何类型的,但都必须是相同的类型。 然而,为了支持分布式处理,Beam须要可以将每一个单独的元素编码为字节串(所以元素能够传递给分布式工做人员)。 Beam SDK提供了一种数据序列化反序列化,内置了不少经常使用类型的Coder,也支持根据须要自定义Coder。
不变性
PCollection是不可变的。 建立后,没法添加,删除或更改单个元素。 Beam Transform能够处理PCollection的每一个元素并生成新的Pipeline数据(做为新的PCollection),但不会改变输入的PCollection。
随机访问
PCollection不支持随机访问单个元素。 相反,Beam Transform能够单独考虑PCollection中的每一个元素。
Size和边界
PCollection是一个大的,不可变的“包”元素。 PCollection能够包含多少元素没有上限;任何给定的PCollection能够是在单机内容可以容纳的数据集,也可能表示来自于数据存储中的很是大的分布式数据集。
PCollection能够是有限的的或无限的。PCollection表示已知固定大小的数据集,而无限PCollection表示无限大小的数据集。 PCollection是有限仍是无限取决于它所表明的数据集的来源。从批量数据源(如文件或数据库)读取可建立有界的PCollection。从流或连续更新的数据源(如Pub / Sub或Kafka)读取会建立一个无限的PCollection(除非您明确告诉它不要)。
根据PCollection的有限(或无限),Beam会采用不一样的方式处理数据。使用批处理做业来处理有限PCollection,批处理做业能够读取整个数据集一次,并在有限长度的做业中执行处理。使用持续运行的流式做业来处理无限PCollection,流式的数据永远不会在哪一时刻整个数据集是完整的,总会有数据源源不断的进来。
当对无限PCollection中的元素进行分组的操做时,Beam须要一个称为 窗口(Window)的概念,将连续更新的数据集划分为有限大小的逻辑窗口。Beam将每一个窗口处理为一个批次(bundle),而且随着数据集的生成,处理继续进行。这些逻辑窗口由与诸如时间戳之类的数据元素相关联的一些特性来肯定。
元素时间戳
PCollection中的每一个元素都具备相关联的时间戳。每一个元素的时间戳记最初由建立PCollection的数据源分配。建立无限PCollection的数据源一般会为每一个新元素分配一个对应于元素被读取或添加的时间戳。
注意:Beam 数据源在建立有限PCollection的时候,会为每一个元素自动分配时间戳。最常规的作法是,全部的元素都赋予相同的时间戳。
时间戳对于包含具备固有时间概念的元素的PCollection是有用的。 若是Pipeline正在读取一系列事件,例如推文或其余社交媒体消息,则每一个元素可能会将事件发布的时间用做元素时间戳。
若是Beam源没有分配时间戳,也能够手动将时间戳分配给PCollection的元素。 若是元素具备固有的时间戳,可是时间戳在元素自己的结构中(例如服务器日志条目中的“时间”)字段,则您须要执行此操做。 Beam提供了Transform将原始的PCollection做为输入并输出具备附加时间戳的PCollection; 有关如何执行此操做的更多信息,请参阅分配时间戳。
在BeamSDK中,Transform是Pipeline中的操做。Transform将PCollection(或多个PCollection)做为输入,对集合中的每个元素执行咱们编写的操做(代码),并生成新的输出PCollection。必须Transform应用于输入PCollection才能起做用。
Beam SDK包含许多不一样的Transform,能够将其应用于Pipeline的PCollection。包括通用的核心转换,如ParDo或Combine。还包括SDK中包含的内置的组合Transform,将一个或多个核心变换组合在有用的处理模式中,例如计数或组合集合中的元素。还能够自定义定义的更复杂的复合转换,以知足Pipeline的业务用例场景。
Beam SDK中的每一个Transform都有一个通用的apply 方法(在python中是管道符|)。调用多个Beam变换相似于方法链。通常形式以下:
[输出PCollection] = [输入PCollection].apply([Transform])
因为Beam使用PCollection的通用应用方法,所以您能够依次连接变换,也能够应用包含嵌套在其中的其余变换的转换(在Beam SDK中称为复合Transform)。
Pipeline中的处理顺序取决于Pipeline的结构,Pipeline能够理解为一张有向无环图,图中的节点是PCollection,边是Transform。以下图所示,能够在Pipeline中进行链式调用:
[最终输出PCollection] = [原始输入PCollection] .apply([First Transform]) .apply([Second Transform]) .apply([Third Transform])
Beam SDK提供了一些通用的Transform框架,能够以函数对象(俗称“用户代码”)的形式编写编写处理逻辑,处理输入的PCollection的元素。 用户代码在实际执行的时候,可能在集群中的不少不一样的worker上并行执行,具体取决于选择执行Beam Pipeline的执行引擎。 在每一个worker上运行用户代码,每一个worker输出PCollection的一部分,最终汇总成1个完整的输出PCollection。
Beam提供了如下Transform,对应于不一样的处理范式:
• ParDo
• GroupByKey
• Combine
• Flatten 和Partition
ParDo是用于并行处理的通用Beam Transform。 ParDo处理范例与Map / Shuffle / Reduce样式算法的“Map”阶段类似:ParDo转换考虑了输入PCollection中的每一个元素,对该元素执行一些处理函数(用户代码),并输出0个,1个或多个元素到输出PCollection。
ParDo可用于各类常见的数据处理操做,包括:
过滤
使用ParDo来判断PCollection中的每一个元素,是否该元素输出到新集合,或者将其丢弃。
格式化或类型转换
若是输入PCollection包含元素的类型或者格式不是所期待的,,则可使用ParDoto对每一个元素执行转换,并将结果输出到新的PCollection。.
提取数据集中数据
例如,若是有一个具备多个字段的记录的PCollection,则可使用ParDo将您想要考虑的字段解析为新的PCollection。
对数据集中的每一个元素进行处理
使用ParDo对PCollection的每一个元素或某些元素执行简单或复杂的计算,并将结果输出为新的PCollection。
在这样的场景里中,ParDo是一个通用的中间步骤。 可使用它从一组原始输入记录中提取某些字段,或将原始输入转换为不一样的格式; 还可使用ParDo将处理后的数据转换为适合输出的格式,例如如数据库表行或可打印字符串。
当进行ParDo转换时,须要以DoFn对象的形式提供用户代码。 DoFn是一个定义分布式处理功能的Beam SDK类。
在PCollection 上调用apply 方法,用ParDo 做为参数,以下代码所示:
// 元素类型为字符串类型的输入PCollection PCollection<String> words = ...; // DoFn子类,用来具体计算每1个元素的长度 static class ComputeWordLengthFn extends DoFn<String, Integer> { ... } // 使用ParDo计算PCollection "words" 中每个单词的长度 PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn()));
在该示例中,咱们的输入PCollection包含String类型的值。 咱们使用一个ParDo Transform,ParDo中使用函数(ComputeWordLengthFn)来计算每一个字符串的长度,并将结果字符串的长度做为值,输出到一个新的元素类型为Integer的PCollection中。
传递给ParDo的DoFn对象中包含对输入集合中的元素的进行处理的。 当使用Beam时,一般最重要的代码是这些DoFn函数,函数里实现了业务逻辑。
DoFn从输入的PCollection一次处理一个元素。 当建立DoFn的子类时,须要提供与输入和输出元素的类型相匹配的类型参数。 若是DoFn处理传入的String元素并生成输出集合的整数元素(像以前的例子ComputeWordLengthFn),类声明将以下所示:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
在DoFn子类中,使用@ProcessElement注解方法,在被注解的方法中实现处理逻辑。 不须要从输入集合手动提取元素, Beam SDK已经封装好。 @ProcessElement方法应该接受类型为ProcessContext的对象。 ProcessContext对象提供了获取输入元素和发出输出元素的方法:
static class ComputeWordLengthFn extends DoFn<String, Integer> { @ProcessElement public void processElement(ProcessContext c) { // Get the input element from ProcessContext. String word = c.element(); // Use ProcessContext.output to emit the output element. c.output(word.length()); } }
注意: 若是 PCollection 的元素是key/value键值对,能够经过ProcessContext.element().getKey()获取键(key), ProcessContext.element().getValue()获取值(value)
给定的DoFn实例一般被调用一次或屡次来处理一些任意的元素组。 然而,Beam并不保证确切的调用次数; 能够在worker节点上屡次调用它,以解决故障和重试。 所以,能够将多个调用中的信息缓存处处理方法中,可是若是这样作,请确保实现不依赖于调用数量。
处理方法中须要知足一些不可变性要求,以确保Beam和执行引擎能够安全地序列化并缓存Pipeline中的值。 方法应符合如下要求:
不该以任何方式修改ProcessContext.element()或ProcessContext.sideInput()返回的元素(输入集合中的传入元素)。
使用ProcessContext.output()或ProcessContext.sideOutput()输出一个值后,不该该以任何方式修改该值。
若是功能相对简单,能够经过提供一个轻量级的DoFn做为匿名内部类实例来简化对ParDo的使用。这是之前的例子,ParDo与ComputeLengthWordsFn,DoFn指定为匿名内部类实例:
// 输入PCollection. PCollection<String> words = ...; // 建立一个匿名类处理PCollection “words”. // 输出单词的长度到新的输出PCollection PCollection<Integer> wordLengths = words.apply( "ComputeWordLengths",// Transform 的自定义名称 ParDo.of(new DoFn<String, Integer>() {// DoFn做为匿名内部类 @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); } }));
若是ParDo将输入元素与输出元素进行一对一映射,即对于每一个输入元素,对应一个输出,可使用更高级的MapElements Transform。 MapElements可使用匿名的Java 8 lambda函数来进一步简化代码。
如下是使用MapElements的上一个示例:
// 输入PCollection. PCollection<String> words = ...; // 在MapElements中使用匿名lambda函数处理 PCollection “words”. //输出单词的长度到新的输出PCollection. PCollection<Integer> wordLengths = words.apply( MapElements.into(TypeDescriptors.integers()) .via((String word) -> word.length()));
注意: java8 lambda函数写法,只能在Filter,FlatMapElements和Partition使用。
GroupByKey 是一个用于处理键/值对集合的Bean Transform,是一个并行Reduce操做,相似于Map / Shuffle / Reduce-style算法的Shuffle阶段。 GroupByKey 的输入是表示多重映射的键/值对的集合,其中集合包含具备相同键但具备不一样值的多个对。给定这样的集合,可使用GroupByKey 来收集与每一个惟一键相关联的全部值。
GroupByKey 是汇总具备共同点的数据的好方法。例如,有一个存储客户订单记录的集合,须要未来自同一邮政编码的全部订单组合在一块儿(其中键/值对的键(key)是邮政编码字段,而值(value)是记录的剩余部分)。
来看一下GroupByKey 的一个简单的例子,其中咱们的数据集由文本文件中的单词和出现的行号组成。咱们想将全部共享相同单词(键)的行号(值)组合在一块儿,让咱们看到文本中出现特定单词的全部位置。
输入是一个键/值对的PCollection ,其中每一个单词都是一个键,该值是该文本出现的文件中的行号。如下是输入集合中的键/值对列表:
cat, 1 dog, 5 and, 1 jump, 3 tree, 2 cat, 5 dog, 2 and, 2 cat, 9 and, 6 ...
GroupByKey 使用相同的键收集全部值,并输出一个新的键值对,最后输出一个包含惟一键和与输入集合中的该关键字所关联的全部值的集合。 若是咱们将GroupByKey 应用于上面的输入集合,则输出集合将以下所示:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ..
所以,GroupByKey表示从多重映射(多个键到各个值)到单一映射(惟一键到值集合)的转换。
CoGroupByKey关联两个或多个具备相同键类型的键/值PCollection,而后输出KV<K, CoGbkResult>集合。 Design Your Pipeline展现了如何在Pipeline中使用Join。
以下两个PCollection:
// collection 1 user1, address1 user2, address2 user3, address3 // collection 2 user1, order1 user1, order2 user2, order3 guest, order4 ...
CoGroupByKey从全部PCollection中收集具备相同键的值,并输出一个由惟一键和包含与该键相关联的全部值的对象CoGbkResult组成的对。 若是将CoGroupByKey应用于上述输入集合,则输出集合将以下所示:
user1, [[address1], [order1, order2]] user2, [[address2], [order3]] user3, [[address3], []] guest, [[], [order4]] ...
键/值对的注意事项:根据使用的语言和SDK不一样,Beam表示键/值的方式对略有不一样。 在Beam SDK for Java中,使用KV<K, V>类型的对象来表示一个键/值对。 在Python中,使用2-tuple表示键/值对。
Combine是一种用于组合数据中元素或值集合的Beam Transform。Combine有一种实现是对键值对PCollection进行处理,根据键值对中的键组合值。
应用Combine Transform时,必须提供一个函数用于组合元素或者键值对中的值。组合函数应该知足交换律和结合律,由于函数不必定在给定键的全部值上精确调用一次。因为输入数据(包括价值收集)能够分布在多个worker之间,因此在每一个worker上都会计算出部分结果,因此能够屡次调用Combine函数,以在值集合的子集上执行部分组合。Beam SDK还提供了一些预先构建的组合功能,用来对数值型的PCollection进行组合,如sum,min和max。
简单的组合操做(如sum)一般能够实现为一个简单的功能。更复杂的组合操做可能须要建立一个具备与输入/输出类型不一样的累加类型的CombineFn 的子类。
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction. public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> { @Override public Integer apply(Iterable<Integer> input) { int sum = 0; for (int item : input) { sum += item; } return sum; } }
经过继承CombineFn 类能够实现复杂的组合功能。如须要一个复杂的累加器,必须进行预处理或者后处理,输出的类型和输入的类型不同,组合的时候须要考虑键值对的键(key)等,则须要使用CombineFn来实现。
组合由4种操做组成。当实现一个CombineFn 的子类的时候必须重写这4个操做:
代码示例以下:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { public static class Accum { int sum = 0; int count = 0; } @Override public Accum createAccumulator() { return new Accum(); } @Override public Accum addInput(Accum accum, Integer input) { accum.sum += input; accum.count++; return accum; } @Override public Accum mergeAccumulators(Iterable<Accum> accums) { Accum merged = createAccumulator(); for (Accum accum : accums) { merged.sum += accum.sum; merged.count += accum.count; } return merged; } @Override public Double extractOutput(Accum accum) { return ((double) accum.sum) / accum.count; } }
一般状况下,对元素为键值对的PCollection 进行组合运算,CombineFn就够了。有一些特殊状况下,须要根据不一样的key作不一样的处理,例如对某些用户计算最小值,对另外的用户计算最大值。使用KeyedCombineFn 能够在代码中获取到key。
对于输入的PCollection,使用全局的组合运算,最终输出只有1个值的PCollection。以下例所示,使用Beam SDK中内置的Sum组合运算,处理输入的PCollection,最终获得一个元素类型为Integer的PCollection:
// Sum.SumIntegerFn() combines the elements in the input PCollection. // The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection. PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()));
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
非全局窗口
若是输入PCollection使用任何非全局窗口函数,则Beam不提供默认行为。 进行组合运算时,必须指定如下选项之一:
一、指定.withoutDefaults,其中输入PCollection中为空的窗口在输出集合中一样为空。
二、指定.asSingletonView,其中输出当即转换为PCollectionView,当用做边输入时,它将为每一个空窗口提供默认值。 通常来讲,若是Pipeline组合运算的结果在后面的Pipeliine中被用做旁路输入(side inputs),那么一般只须要使用此选项。
在建立以key分组的集合(例如,经过使用GroupByKey Transform)以后,常规模式是将与每一个key相关联的值的集合合并成单个值。 根据GroupByKey的前一个例子,一个名为groupingWords的按键组合的PCollection以下所示:
cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ...
在上述PCollection中,每一个元素都有一个字符串类型的键(例如“cat”)和一个可迭代的整数集合(在第一个元素中,包含[1,5,9])。 若是Pipeline的下一个处理步骤组合这些值(而不是单独考虑它们),则能够组合整数集合,以建立要与每一个键配对的单个合并值。 GroupByKey而后接着对值进行合并,这种处理模式至关于Beam的Combine PerKey转换。 Combine PerKey提供的Combine函数必须是知足结合律的Reduce函数或CombineFn的子类。
// 对PCollection按照key进行分组,对每一个分组中的Double类型的值进行求和 ,值类型与以前同样 PCollection<KV<String, Double>> salesRecords = ...; PCollection<KV<String, Double>> totalSalesPerPerson = salesRecords.apply(Combine.<String, Double, Double>perKey( new Sum.SumDoubleFn())); // 聚合以后的值与PCollection原始值的类型不一样 // PCollection的元素为KV类型的,Key是String,Value是Integer,聚合以后的值是Double PCollection<KV<String, Integer>> playerAccuracy = ...; PCollection<KV<String, Double>> avgAccuracyPerPlayer = playerAccuracy.apply(Combine.<String, Integer, Double>perKey( new MeanInts())));
Flatten和Partition是存储相同数据类型的PCollection对象的的Beam Transform。 Flatten将多个PCollection对象合并到1个PCollection中,而且Partition将单个PCollection拆分为固定数量的较小集合。
以下代码示例,展现了如何将Flatten应用在PCollection上。
// Flatten接受一个PCollectionList,PCollectionList是一组具备相同元素类型的PCollection //将PCollectionList中全部子PCollection的元素放到一个新的PCollection中,并返回这个新的PCollection PCollection<String> pc1 = ...; PCollection<String> pc2 = ...; PCollection<String> pc3 = ...; PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
合并以后的PCollection中数据编码
默认状况下,输出PCollection的Coder与输入PCollectionList中第一个PCollection的Coder相同。 可是,输入的PCollection对象能够分别使用不一样的Coder,只要Java包含相同的数据类型便可。
合并窗口集合
当使用Flatten合并应用了窗口策略的PCollection对象时,要合并的全部PCollection对象必须使用兼容的窗口策略和窗口大小。 例如,合并的全部集合必须所有使用(假设)相同的5分钟长度固定窗口或4分钟长度的滑动窗口每30秒滑动一次。
若是Pipiline尝试使用Flatten将PCollection对象与不兼容的窗口合并,则当构建Pipeline时,Beam会生成IllegalStateException错误。
Partition用来切分PCollection。 Partition功能包含肯定如何将输入PCollection的元素分解为每一个生成的分区PCollection的逻辑。 分区数必须在Pipeline构建时肯定。 例如,能够在运行时将分区数做为命令行选项传递(而后用于构建Pipeline图),但不能在运行时流水线中的再肯定分区数(基于之后计算的数据) 例如,您的流水线图是构建的)。
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function. // In this example, we define the PartitionFn in-line. // Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects. PCollection<Student> students = ...; // Split students up into 10 partitions, by percentile: PCollectionList<Student> studentsByPercentile = students.apply(Partition.of(10, new PartitionFn<Student>() { public int partitionFor(Student student, int numPartitions) { return student.getPercentile() // 0..99 * numPartitions / 100; }})); // You can extract each partition from the PCollectionList using the get method, as follows: PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
当编写一个Beam Transform代码时,须要理解最终的代码是要分布式执行的。 例如,编写的代码,会生成不少副本,在不一样的机器上并行执行,相互独立,而无需与任何其余机器上的副本通讯或共享状态。 根据Pipeline的执行引擎,能够选择Pipeline,代码的每一个副本可能会重试或运行屡次。 所以应该谨慎地在代码中包括状态依赖关系。
简单来讲,编写的代码至少要知足如下两个要求:
• 函数必须是可序列化的。 • 函数必须是线程兼容的,Beam SDK并非线程安全的。
除了同样的要求,强烈建议函数是知足幂等特性。
注意:以上的要求适用于DoFn(ParDo 内使用),ConbineFn( Combine 内使用)和WindowFn(Window 内使用)
提供给Transform的任何函数必须是彻底可序列化的。 这是由于函数的副本须要序列化并传输处处理集群中的远程worker。 用户代码的父类,如DoFn,CombineFn和WindowFn,已经实现了Serializable; 可是,在子类不能添加任何不可序列化的成员。
须要时刻记住的序列化要点以下:
• 函数对象中的瞬态字段不会传输到工做实例,由于它们不会自动序列化。 • 在序列化以前避免加载大量数据的字段。 • 函数对象实例之间不能共享数据。 • 函数对象在应用后会变得无效。 • 经过使用匿名内部类实例来内联声明函数对象时要当心。 在非静态上下文中,内部类实例将隐含地包含一个指向封闭类和该类的状态的指针。 该内部类也将被序列化,所以适用于函数对象自己的相同注意事项也适用于此外部类。
编写的函数应该兼容线程的特性。在执行时,每个worker会启动一个线程执行代码,若是想实现多线程,须要在代码中本身实现。可是Beam SDK不是线程安全的,因此实现多线程须要开发者本身控制同步。注意,静态变量并不会序列化传递到不一样的worker上,还可能会被多个线程使用。
强烈建议开发者编写的函数符合幂等性—即不管重复执行多少次都不会带来意外的反作用。Beam模型中,并不能保证函数的执行次数,鉴于此,符合幂等性,可让Pipeline的是肯定的,Transform的行为是可预测的,更容易测试。
PCollection主输入PCollection,还能够以旁路输入(side inputs)的形式为ParDo Transform提供额外的输入。 旁路输入是DoFn每次处理输入PCollection中的元素时能够访问的附加输入。 当指定边输入时,能够建立一个能够在ParDo Transform的DoFn中读取的其余数据的视图,同时处理每一个元素。
若是ParDo在处理输入PCollection中的每一个元素时须要注入附加数据,旁路输入会很是有用,但须要在运行时肯定附加数据(而不是硬编码)。 这些值可能由输入数据肯定,或者取决于Pipeline的不一样分支。
// 调用.withSideInputs将为ParDo添加旁路输入 side input //在DoFn内,经过DoFn.Processecontext.sideInput可使用旁路输入 side input // ParDo的输入PCollection. PCollection<String> words = ...; //包含了单词长度的PCollection,将PCollection中的值聚合为1个值 PCollection<Integer> wordLengths = ...; // Singleton PCollection // 使用Combine.globally and View.asSingleton来计算单词的总长度,生成一个一个单例的PCollectionView final PCollectionView<Integer> maxWordLengthCutOffView = wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView()); // 在ParDo中使用maxWordLengthCutOffView做为side input. PCollection<String> wordsBelowCutOff = words.apply(ParDo .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); // 在DoFn内使用side input. int lengthCutOff = c.sideInput(maxWordLengthCutOffView); if (word.length() <= lengthCutOff) { c.output(word); } } }).withSideInputs(maxWordLengthCutOffView) );
窗口化的PCollection多是无限的,所以不能被压缩成单个值(或单个集合类)。当建立一个基于窗口化PCollection的PCollectionView时,PCollectionView表示每一个窗口的一个实例(能够是每窗口一个值,也能够是每一个窗口一个列表等)。
Beam使用主输入元素的窗口来查找旁路输入元素的适当窗口。Beam将主输入元素的窗口投影到侧面输入的窗口集合中,而后使用来自窗口的旁路输入。若是主输入和侧输入具备相同的窗口,投影将提供准确的相应窗口。然而,若是输入具备不一样的窗口,则Beam使用投影来选择最合适的旁路输入窗口。
例如,若是使用1分钟的固定时间窗口对主输入进行了窗口化,而且使用1个小时的固定时间窗口对边输入进行了窗口化,则Beam将主输入窗口映射到为旁路输入窗口,并从旁路输入中的合适窗口选择值。
若是主输入元素存在于多个窗口中,那么processElement被调用屡次,每一个窗口一次。对processElement的每一个调用都会为主输入元素投射“当前”窗口,所以可能会每次提供不一样的旁路输入视图。
若是侧面输入有多个触发器,则Beam将使用最近触发器触发的值。使用用带有触发器的单个全局窗口的旁路输入时,此功能特别有用。
虽然ParDo老是生成主输出PCollection(做为从apply方法返回值),可是也可让ParDo生成任意数量的附加输出PCollection。 若是使用具备多个输出,ParDo将返回捆绑在一块儿的全部输出PCollection(包括主输出)。
使用Tags多路输出的代码示例:
// 为了将数据元素发送给多个下游PCollection,须要建立TupleTag来标示每一个PCollection //例如若是想在ParDo中建立三个输出PCollection(1个主输出,两个旁路输出),必需要建立3个TupleTag // 下边的代码示例中展现了如何建立为3个输出PCollection建立TupleTag // 输入PCollection PCollection<String> words = ...; // 输入PCollection中低于cutoff的单词发送给主输出PCollection<String> // 若是单词的长度大于cutoff,单词的长度发送给1个旁路输出PCollection<Integer> // 若是单词一"MARKER"开头, 将单词发送给旁路输出PCollection<String> // ou final int wordLengthCutOff = 10; //为每一个输出PCollection建立1个TupleTag // 单子低于cutoff长度的输出PCollection final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; // 包含单词长度的输出PCollection final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; // 以"MARKER"开头的单词的输出PCollection final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; // 将输出TupleTag传给ParDo //调用.withOutputTags为每一个输出指定TupleTag // 先为主输出指定TupleTag,而后旁路输出 //在上边例子的基础上,为输出PCollection设定tag // 全部的输出,包括主输出PCollection都被打包到PCollectionTuple中。 PCollectionTuple results = words.apply(ParDo .of(new DoFn<String, String>() { //DoFn内的业务逻辑. ... }) // 为主输出指定tag. .withOutputTags(wordsBelowCutOffTag, // 使用TupleTagList为旁路输出设定ta TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)));
DoFn中多路输出代码示例:
// 在ParDo的DoFn中,在调用ProcessContext.output的时候可使用TupleTag指定将结果发送给哪一个下游PCollection // 在ParDo以后从PCollectionTuple中解出输出PCollection // 在前边例子的基础上,本例示意了将结果输出到主输出和两个旁路输出 .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { // 将长度较短的单词发送到主输出 // 在本例中,是wordsBelowCutOffTag表明的输出 c.output(word); } else { // 将长度较长的单词发送到 wordLengthsAboveCutOffTag表明的输出中. c.output(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { // 将以MARKER为开头的单词发送到markedWordsTag的输出中 c.output(markedWordsTag, word); } }}));
Transform能够嵌套,复杂变换执行多个更简单的变换(例如多个ParDo,Combine,GroupByKey或甚至其余复合Transform)。 将多个Transform嵌入到单个复合变换中可使代码更加模块化,更易于理解。
Beam SDK包含许多有用的复合转换。 有关转换列表,请参阅API参考页面:
WordCount 示例程序中的CountWordsTransform是复合Transform的示例。 CountWords是由多个嵌套Transform组成的PTransform子类。
在expand展方法中,CountWordsTransform逻辑以下:
它将 Beam SDK库中的Count Transform应用于PCollection的单词,产生一个键/值对的PCollection。 每一个键表示文本中的一个单词,每一个值表示单词在原始数据中出现的次数。
注意,这也是嵌套复合Transform的示例,由于Count自己就是复合Transform。
复合Transform的参数和返回值必须与整个变换的初始输入类型和最终返回类型相匹配,即便Transform的处理过程当中的中间数据的数据类型变化屡次。
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { //将每行文本分割成单词 PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // 统计每一个单词出现的次数 PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } }
要建立复合Transform,集成PTransform类,并重写expand方法,在方法中实现具体的逻辑。 而后,就能够像使用Beam SDK的内置Transform同样使用此复合Transform。
对于PTransform类类型参数,您将传递您的Transform所用的PCollection类型做为输入,并生成输出。 要将多个PCollections做为输入,或者产生多个PCollections做为输出,从多个PCollection中的选取一个PCollection的类型,做为Transform的输出类型参数。
以下例所示,Transform使用子元素为String类型的PCollection做为输入,子元素为Integer的PCollection做为输出:
static class ComputeWordLengths extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
在继承PTransform子类中,须要重写expand方法。 expand方法是添加PTransform的处理逻辑的地方。 重写的expand方法必须接受适当类型的输入PCollection做为参数,并将输出PCollection指定为返回值。
如下代码示例显示如何覆盖上一个示例中声明的ComputeWordLengths类的expand方法:
static class ComputeWordLengths extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // 转换逻辑 ... }
只要重写的PTransform子类中的expand方法来接受适当的输入PCollection并返回相应的输出PCollection,就能够包含任意数量的Transform。 这些变换能够包括Beam核心Transform,复合Transform或Beam SDK库中包含的Transform。
注意:PTransform的expand方法并不意味着转换用户直接调用。 相反,您应该在PCollection自己调用apply方法,以变换为参数。 这容许将转换嵌套在管道的结构中。
建立Pipeline时,常常须要从外部数据源或数据库中读取数据。 一样,Pipeline会将其结果数据输出到相似的外部数据接收器。 Beam为许多常见的数据存储类型提供读写Transform。 若是要让Pipeline读取或写入内置Transform中还不支持的数据存储格式,能够 实现自定义的读写Transform。
读取Transform从外部源读取数据并返回数据的PCollection,供Pipeline使用。 通常在Pipeline建立时读取数据是最多见的,同时也容许在Pipeline中任何须要的地方读取数据。
PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
写入Transform将PCollection中的数据写入外部数据源。通常在Pipeline结束时读取数据是最多见的,同时也容许在Pipeline中任何须要的地方写入数据到外部数据源。
output.apply(TextIO.write().to("gs://some/outputData"));
许多读取Transform支持用glob运算符匹配的多个输入文件中读取数据。 请注意,glob运算符是特定于文件系统的,并遵循文件系统特定的一致性模型。 如下TextIO示例使用glob运算符(*)读取在给定位置中具备前缀“input-”和后缀“.csv”的全部匹配输入文件:
p.apply(“ReadFromText”,
TextIO.read().from("protocol://my_bucket/path/to/input-*.csv");
要未来自不一样来源的数据读取到单个PCollection中,能够分别读取每一个数据源,而后使用FlattenTransform建立合并成单个PCollection。
对于基于文件的输出数据,默认状况下,写入Transform写入多个输出文件。 将输出文件名传递给写入Transform时,文件名将用做文件的前缀。 能够经过指定一个后缀来为每一个输出文件附加一个后缀。
如下写入变换示例将多个输出文件写入到某个位置。 每一个文件都有前缀“数字”,数字标签和后缀“.csv”。
records.apply("WriteToText", TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv"));
当Beam的执行引擎运行Pipeline时,常常须要序列化反序列化PCollections中的中间数据,这就须要将元素转换为二进制字节码和从二进制字节码中转换。 Beam SDK使用被称为“Coders”的对象来描述如何对给定的PCollection的元素进行编码和解码。
请注意,Coder与外部数据源或汇点交互时与解析或格式化数据无关。 这种解析或格式化一般应该在诸如ParDo或MapElements之类的Transform中明确指定。
在Beam Java SDK中,Coder提供编码和解码数据所需的方法。 Java SDK为Java中的标准类型提供了Coder的实现,例如Integer,Long,Double,StringUtf8等。 能够在Coder包中找到全部可用的Coder子类。
请注意,Coder与类型不定是1:1的关系。 例如,整数类型能够有多个有效的编码器,输入和输出数据可使用不一样的整数编码器。 Transform可能使用使用BigEndianIntegerCoder输入数据,而使用VarIntCoder输出数据。
Beam要求Pipeline中的每一个PCollection都有Coder。在大多数状况下,Beam SDK可以根据PCollection元素类型或生成它的Transform来自动推断PCollection的Coder,可是在某些状况下,Pipeline的开发者须要明确指定Coder,或者开发一个自定义类型的Coder。
可使用PCollection.setCoder方法显式设置现有PCollection的Coder。请注意,没法在已完成的PCollection上调用setCoder(例如,调用.apply以后)。
可使用getCoder方法获取现有PCollection的Coder。若是Coder还没有设置且不能推断PCollection的Coder,则此方法将调用失败,并抛出IllegalStateException。
Beam SDK在尝试自动推断PCollection的Coder时使用了多种机制。
每一个Pipeline对象都有一个CoderRegistry。 CoderRegistry表示Java类型与Pipeline应用于每种类型的PCollection的默认Coder的对应关系。
默认状况下,Beam Java SDK 会自动使用Transform函数对象的类型参数(如DoFn)做为PTransform生成的PCollection的Coder。在ParDo的状况下,例如,DoFn
每一个Pipeline对象都有一个CoderRegistry对象,它将语言类型映射到Pipeline要使用的类型的默认Coder。 您能够本身使用CoderRegistry查找给定类型的默认编码器,或者为给定类型注册新的默认编码器。CoderRegistry包含了Beam Java SDK建立的Pipeline的Coder与标准Java类型的默认映射。
下表显示了标准对应关系:
Java 类型 | 默认Coder |
---|---|
Double | DoubleCoder |
Instant | InstantCoder |
Integer | VarIntCoder |
Iterable | IterableCoder |
KV | KvCoder |
List | ListCoder |
Map | MapCoder |
Long | VarLongCoder |
String | StringUtf8Coder |
TableRow | TableRowJsonCoder |
Void | VoidCoder |
byte[ ] | ByteArrayCoder |
TimestampedValue | TimestampedValueCoder |
使用CoderRegistry.getDefaultCoder方法能够获取Java类型的默认Coder。 使用Pipeline.getCoderRegistry方法能够访问Pipeline的CoderRegistry。 这样就能够基于每一个流水线肯定(或设置)Java类型的默认Coder:即“对于此Pipeline,验证是不是使用BigEndianIntegerCoder对Integer值进行编码”。
要为Pipeline为Java类型设置默认编码器,能够获取并修改管道的CoderRegistry。 可以使用Pipeline.getCoderRegistry方法获取CoderRegistry对象,而后使用CoderRegistry.registerCoder方法为目标类型注册新的Coder。
如下示例代码演示了如何为流水线的整数值设置默认Coder(在本例中为BigEndianIntegerCoder)。
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
若是Pipeline使用了自定义数据类型,则可使用@DefaultCoder注释来指定要与该类型一块儿使用的Coder。 例如,假设要使用SerializableCoder的自定义数据类型,可使用@DefaultCoder注释,以下所示:
@DefaultCoder(AvroCoder.class) public class MyCustomDataType { ... }
若是建立了一个自定义Coder来匹配数据类型,而且要使用@DefaultCoder注释,则自定义的Coder类必须实现静态Coder.of(Class )工厂方法。
public class MyCustomCoder implements Coder { public static Coder<T> of(Class<T> clazz) {...} ... } @DefaultCoder(MyCustomCoder.class) public class MyCustomDataType { ... }
窗口根据PCollection中的每一个元素的时间戳细分PCollection。 聚合运算(如GroupByKey和Combine)在每一个窗口的基础上隐式工做 - 它们将每一个PCollection做为多个有限窗口的连续过程进行处理,尽管整个集合自己多是无限大小的。
触发器用来决定什么时候在无限数据到达时发出聚合结果,使用触发器可有优化PCollection的窗口策略。 触发器容许处理迟到的数据或在窗口结束前预先计算不完整的结果。 有关详细信息,请参阅触发器部分。
一些Beam Transform,如GroupByKey和Combine,经过公共key对多个元素进行分组。 一般,分组操做将在整个数据集中具备相同key的全部元素分组。 使用无限数据集,因为新元素不断被添加而且多是无限多的(例如流数据),因此不可能在某一个时刻是PCollection包含了全部的元素,此时窗口特别有用。
在Beam模型中,任何PCollection(包括无限PCollections)均可以使用逻辑上的窗口进行切分。 PCollection中的每一个元素根据PCollection的窗口功能分配给一个或多个窗口,每一个窗口包含有限数量的元素。 分组Transform而后在每一个窗口的基础上处理PCollection的每一个元素。 GroupByKey,例如,经过键和窗口隐式地分组PCollection的元素。
注意:Beam的默认窗口行为是将PCollection的全部元素分配到单个全局窗口,并丢弃迟到的数据,即便对于无限PCollections也是如此。 在无限PCollection使用GroupByKey之类的分组变换以前,必须至少执行如下操做之一:
• 设置一个非全局的窗口函数,参见为PCollection设置窗口函数. • 设置一个非默认的 触发器,这能够防止触发窗口的默认行为(等待全部的数据到达).
若是没有为无限PCollection设置非全局窗口函数或非默认触发器,随后使用GroupByKey或Combine等分组Transform是,在构建Pipeline时将会发生错误,做业会失败。
为PCollection设置窗口函数后,下次将组合Transform应用于PCollection时,将使用窗口做为基础。 窗口分组根据须要进行。 若是您使用Window转换设置了一个窗口函数,则将每一个元素分配给一个窗口,只有在GroupByKey或Combine这样的操做中才会用到窗口。 这可能会对Pipeline产生不一样的影响。 考虑下图中的示例Pipeline:
在上述Pipeline中,使用KafkaIO读取一组键/值对来建立一个无限PCollection,而后使用WindowTransform将该窗口函数应用于该集合, 而后将ParDo应用于该集合,而后使用GroupByKey将ParDo的结果分组。 窗口函数对ParDoTransform没有影响,由于在GroupByKey须要以前,窗口实际上并无被使用。 而后,GroupByKey以后的处理就是基于键和窗口的分组。
在有限PCollections中可使用具备固定大小的窗口。 可是,请注意,窗口仅考虑附加到PCollection的每一个元素的隐式时间戳,建立固定数据集的数据源(如TextIO)会为每一个元素分配相同的时间戳。 这意味着默认的全部元素都属于单个全局窗口。
要在限数据集上使用窗口,能够为每一个元素分配本身的时间戳。 要为元素分配时间戳,请使用具备DoFn的ParDo转换,在DoFn中为每一个元素附加一个新的时间戳(例如,在Beam Java SDK中的WithTimestamps Transform)。
为了说明如何使用有限PCollection进行窗口化可能会影响Pipeline如何处理数据,以下图所示:
在上面的Pipeline中,使用TextIO读取一组键/值对来建立一个有限PCollection。 而后,使用GroupByKey对集合进行分组,并将ParDo转换应用于分组的PCollection。 在此示例中,GroupByKey建立一个惟一的键值对(值是输入元素的值的集合),而后ParDo对每一个key处理1次。
请注意,即便没有设置窗口函数,仍然有1个窗口 - PCollection中的全部元素都分配给单个全局窗口。
如今对相同的Pipeline使用窗口函数,以下图所示:
如上所示,Pipeline建立一个元素为键值对的PCollection,而后为PCollection设置一个窗口函数,GroupByKeyTransform基于窗口,经过键和窗口对PCollection的元素进行分组。 随后的ParDo Transform对每一个key应用屡次,每一个窗口一次。
可使用不一样类型的窗口来切分PCollection的元素。 Beam提供了几个窗口功能,包括:
• 固定时间窗口Fixed Time Windows • 滑动时间窗口Sliding Time Windows • 会话窗口Per-Session Windows • 单一全局窗口Single Global Window • 基于日历的时间窗口Calendar-based Windows
注意:每一个元素能够逻辑上属于多个窗口,具体取决于使用的窗口函数。 例如,滑动时间窗口建立重叠的窗口,其中能够将单个元素分配给多个窗口。
最简单的窗口形式是使用固定时间窗口:有1个持续更新的时间戳PCollection,每一个窗口能够捕获(例如)全部时间戳在5分钟时间间隔内的元素。
固定时间窗口表示数据流中一致的连续、不重叠的时间间隔。 好比5分钟固定长度窗口:无限PCollection中的全部元素,时间戳值从0:00:00到(但不包括)0:05:00属于第一个窗口,时间戳值为0的元素 :05:00(但不包括)0:10:00属于第二个窗口,依此类推。
滑动时间窗口也表示数据流中的时间间隔; 然而,滑动时间窗口能够重叠。 例如,每一个窗口可能捕获五分钟的数据,可是每十秒钟会启动一个新窗口。 滑动窗口开始的频率称为周期。 所以,示例中的窗口的时间长度为5分钟,滑动周期为10秒钟。
因为多个窗口重叠,数据集中的大多数元素将属于多个窗口。 这种窗口对于计算不断变化的数据的均值很是有用; 使用滑动时间窗口,能够在示例中计算过去5分钟的数据的运行平均值,每10秒更新一次。
会话窗口是一种在时间上非连续的窗口。 会话窗口适用于每一个key,对于在时间上呈现不规则分布的数据颇有用。 例如,表示用户鼠标活动的数据流可能具备长时间的空闲时间,而在另外一个时间范围内点击不少。 若是数据在最小时间隙以后到达,则启动一个新的窗口。
注意: 每个key由于数据在时间分布上的差别,而具备不一样的窗口。
默认状况下,PCollection中的全部数据都被分配给单一全局窗口,而且丢弃迟到的数据。 若是是有限数据集,则可使用PCollection的全局窗口默认值。
若是是无限数据集(例如来自流式数据源),也可使用单个全局窗口,但在应用聚合Transform时(如GroupByKey和Combine)时要当心。 带有默认触发器的单个全局窗口一般要求整个数据集在处理以前可用,这在连续更新数据时是不可能的。 要在使用全局窗口的无限PCollection上执行聚合,应为该PCollection指定非默认触发器。
能够经过apply窗口Transform来设置PCollection的窗口函数。 进行WindowTransform时,必须提供一个WindowFn。 WindowFn用来肯定PCollection使用哪一种窗口函数来切分PCollection,如固定或滑动时间窗口。
Beam为此处描述的基本窗口功能提供预约义的WindownFn。 若有更复杂的需求,您还能够自定义WindowFn。
设置窗口函数时,可能还须要为PCollection设置触发器(trigger)。 触发器用来决定每一个窗口什么时候被聚合和发出,而且能让窗口函数可以对迟到的数据的处理和在窗口超时前预先计算结果有更好的方式。 有关详细信息,请参阅触发器部分。
如下示例代码显示了如何1分钟长度的固定时间窗口应用在PCollection上:
PCollection<String> items = ...;
PCollection<String> fixed_windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
如下示例代码显示了如何使用滑动时间窗口将PCollection切分。 每一个窗口长度为30分钟,每5秒钟开1个新窗口:
PCollection<String> items = ...;
PCollection<String> sliding_windowed_items = items.apply(
Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));
如下示例代码显示了如何使用会话窗口切分PCollection,最小的时间跨度为10分钟:
PCollection<String> items = ...;
PCollection<String> session_windowed_items = items.apply(
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
注意:会话窗口首先是基于key的,每一个key有本身的会话窗口,有多少个会话窗口,取决于数据在时间上的分布。
若是您的PCollection是有限的(大小是固定的),能够将全部元素分配给单一全局窗口。 如下示例代码显示如何为PCollection设置单一全局窗口:
PCollection<String> items = ...;
PCollection<String> batch_items = items.apply(
Window.<String>into(new GlobalWindows()));
在任何数据处理系统中,数据事件产生时间(由数据元素自己的产生的时刻,肯定的“事件时间”)与实际数据元素的处理时刻之间存在必定的滞后(“处理时间”,由系统上数据被处理的时刻决定)。此外,不能保证数据事件将按照生成的顺序在Pipeline中进行处理。
例如,假设咱们有一个使用固定时间窗口的PCollection,窗口长度为五分钟。对于每一个窗口,Beam必须在给定的窗口范围内(例如在第一个窗口的0:00和4:59之间)收集全部的数据,判断依据是事件时间。时间戳超出该范围(5:00或更晚的数据)的数据属于不一样的窗口。
然而,数据没法保证按照事件时间的顺序到达Pipeline,或者始终以可预测的延迟到达。Beam使用Watermark的概念,即认为某个窗口中的全部数据都到达的时刻。在Watermark以后的数据叫作延迟数据。
从咱们的例子中,假设咱们有一个简单的水印,假设数据时间戳(事件时间)和数据出如今Pipeline的时间(处理时间)之间大约30秒的滞后时间,那么Beam将在5 :30关闭第一个窗口。若是数据记录到达5:34,可是时间戳记会在0:00-4:59窗口(好比说3:38)中,那么该记录是延迟数据。
注意:为简单起见,咱们假设使用了一个很是简单的Watermark来估计滞后时间。实际上,PCollection的数据源决定了Watermark,而且Watermark可能更精确或更复杂。
Beam的默认窗口配置,会基于数据源的类型,尝试肯定全部数据什么时候到达,而后将Watermark提早移动到窗口的末尾。此默认配置下延迟数据会被丢弃。使用触发器(Trigger)能够修改和优化PCollection的窗口策略,来决定每一个窗口什么时候聚合并报告其结果,同时包含了窗口如何处理延迟数据的策略。
设置PCollection的窗口策略时,调用.withAllowedLateness操做来容许延迟数据。 如下代码示例演示了窗口策略,容许在窗口结束后最多两天的延迟数据。
PCollection<String> items = ...;
PCollection<String> fixed_windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(2)));
当在PCollection中设置.withAllowedLateness时,设置的容许延迟时间会向前传播到该PCollection的任何后续PCollection。 若是要稍后在Pipeline中更改容许的延迟,则必须经过显式调用Window.configure().withAllowedLateness()来修改。
无限数据源为每一个元素附加了时间戳。可是因为数据来源类型的不一样,时间戳可能不符合须要,可能须要从原始数据流中从新提取时间戳。有限数据源(例如来自TextIO的文件)不提供时间戳, 若是须要时间戳,则必须将它们添加到PCollection的元素中。
能够经过使用ParDo Transform为PCollection的元素分配新的时间戳,在ParDo Transform中添加时间戳后,输入一个新的PCollection。例如:若是Pipeline从输入文件读取日志记录,而且每一个日志记录都包含时间戳字段; 因为Pipeline从文件读取记录,文件源不会自动分配时间戳。 此时须要从每一个记录中解析时间戳字段,并使用带DoFn的ParDo Transform将时间戳附加到PCollection中的每一个元素。
PCollection<LogEntry> unstampedLogs = ...;
PCollection<LogEntry> stampedLogs =
unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() { public void processElement(ProcessContext c) { // 从当前处理的日志记录中解析出时间戳E Instant logTimeStamp = extractTimeStampFromLogEntry(c.element()); // 使用ProcessContext.outputWithTimestamp (而不是 // ProcessContext.output)发出带有时间错的日志记录 c.outputWithTimestamp(c.element(), logTimeStamp); } }));
当收集数据并将数据按照窗口进行分组时,Beam使用触发器来肯定什么时候发出每一个窗口的聚合结果(称为窗格)。 若是使用Beam的窗口默认设置和默认触发器,Beam会在估计全部数据到达时输出聚合结果,并丢弃该窗口的全部延迟数据。
能够为PCollections设置触发器来更改此默认行为。 Beam提供了一些内置触发器:
事件时间触发器
这类触发器根据事件时间进行触发,Beam的默认触发器是事件时间触发器。
处理时间触发器
这类触发器根据事件的处理时间(在Pipeline中的每一个阶段处理数据元素的时间)进行触发。
数据驱动触发器
这类触发器经过在数据到达每一个窗口时检查数据,并在数据知足某个属性时触发操做。 目前,数据驱动的触发器只支持在必定数量的数据元素以后触发。
复合触发器
这类触发器组合使用事件时间触发器、处理时间触发器、数据驱动触发器等。
从更高的层次看,与简单的在窗口结束时输出数据相比,触发器提供两个附加功能:
触发器容许Beam在窗口中的全部数据到达以前,先计算并发出结果。
例如,在通过一段时间以后或在必定数量的元素到达以后计算并发出,此时窗口还没有关闭.
触发器提供了在事件时间的Watermark超过窗口结束时间以后处理延迟数据的机会。
这些特性让开发者可以控制数据流和在不一样约束以前取舍:
例如,时间敏感的系统可能会使用严格的基于时间的触发器,每N秒发出一个窗口,数据的时效性的重要程度大于完整性。 数据完整性超过结果的时效性的系统可能会选择使用Beam的默认触发器,该触发器在窗口的末尾触发。还能够为无限PCollection设置触发器,该触发器使用单个全局窗口进行PCollection切分。 当但愿Pipeline在无限数据集上提供按期更新时,这可能会颇有用 - 例如,当前所拥有的数据的平均值,每N秒更新一次或每N个元素。
AfterWatermark触发器以事件时间为基础触发。 当Watermark超过窗口末尾时触发,将窗口中的数据发送到下游。Watermark是全局的进程指标,在Beam的概念中,表示输入是否完整。 AfterWatermark.pastEndOfWindow()仅在Watermark经过窗口的末尾时触发。 此外,可使用.withEarlyFirings(trigger)和.withLateFirings(trigger)来配置触发器,当Pipeline在窗口结束以前或以后收到数据,则触发器将触发。
// 在月末的时候生成帐单 AfterWatermark.pastEndOfWindow() //持续的实时产生预计帐单 .withEarlyFirings( AfterProcessingTime .pastFirstElementInPane() .plusDuration(Duration.standardMinutes(1)) // 当延迟数据到达的时候持续的修正帐单,最终帐单是准确的F .withLateFirings(AfterPane.elementCountAtLeast(1))
PCollection的默认触发是基于事件时间,当Beam的Watermark超过窗口的末尾时,发出窗口的结果,而后每当延迟数据到达时触发。
可是,若是同时使用窗口默认设置和默认触发器,则默认触发器将会发出一次,而且丢弃延迟的数据。 这是由于默认窗口配置的容许的延迟值为0.有关修改此行为的信息,请参阅处理延迟数据部分。
AfterProcessingTime触发器根据处理时间进行触发。 例如,AfterProcessingTime.pastFirstElementInPane()触发器在收到数据后通过必定的处理时间后会发出一个窗口。 处理时间由系统时钟决定,而不是数据元素的时间戳。AfterProcessingTime触发器可用于触发窗口的早期结果,特别是时间跨度很是大的窗口(如单个全局窗口)。
Beam提供了一个数据驱动的触发器AfterPane.elementCountAtLeast()。 该触发器对元素计数起做用; 它在当前窗格至少收集了N个元素后触发。 这容许窗口发出早期的结果(在全部数据已经累积以前),若是使用单个全局窗口,这可能特别有用。须要注意的是,例如,若是使用.elementCountAtLeast(50)而且只有32个元素到达,则这32个元素永远没有机会触发,若是32个元素很重要,考虑使用复合触发器来组合多个触发条件,例如“当我收到50个元素或每1秒触发”时。
使用Window Transform为PCollection设置窗口函数时,能够指定触发器。经过在Window.into()转换结果上调用方法.triggering()来设置PCollection的触发器,以下所示:
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .discardingFiredPanes());
此代码示例为PCollection设置基于时间的触发器,该触发器在该窗口中的第一个元素已被处理后1分钟发出结果。 代码示例中的最后一行.discardingFiredPanes()是窗口的累加模式。
当指定触发器时,还必须设置窗口的累积模式。 当触发器触发时,它将窗口的当前内容做为窗格发出。 因为触发器能够屡次触发,因此累积模式决定系统是否在触发器触发时累加窗口窗格,或者丢弃它们。
要设置窗口以累积触发器触发时生成的窗格,请在设置触发器时调用.accumulatingFiredPanes()。 要设置一个窗口来放弃已触发的窗格,请调用.discardingFiredPanes()。
咱们来看一个使用具备固定时间窗口和基于数据的触发器的PCollection的例子。 例如,若是窗口长度为10分钟,而后计算数据的均值,可是但愿在UI中更新频繁显示当前平均值,而不是每10分钟更新一次。
下图显示了具备key = X事件,到达PCollection并将其分配给窗口。 为了使图表更简单,假设事件都按顺序到达:
若是触发器设置为.accumulatingFiredPanes,触发器将在每次触发时发出如下值。 记住,每次3个新元素到达时触发器都会触发:
第1次触发: [5, 8, 3] 第2次触发: [5, 8, 3, 15, 19, 23] 第3次触发: [5, 8, 3, 15, 19, 23, 9, 13, 10]
若是触发器设置为 .discardingFiredPanes,触发器每次触发时,发出的数据以下:
第1次触发: [5, 8, 3] 第2次触发: [15, 19, 23] 第3次触发: [9, 13, 10]
若是但愿Pipeline处理Watermark超过窗口末尾后到达的数据,能够在设置窗口时设置容许的延迟时间。 这使触发器有机会处理延迟数据。 若是设置了容许的延迟时间,默认的触发器会在延迟数据到达时当即发出新的结果。
使用.withAllowedLateness() 容许的延迟时间:
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardMinutes(30));
容许的延迟时间会向下传播,设置了PCollection的后续PCollection都会继承。 若是要稍后在Pipeline中更改容许的延迟时间,能够显式调用Window.configure().AllowedLateness()。
能够组合多个触发器来造成复合触发器,而且能够指定触发器的触发方式:重复发出结果,最多一次或其余自定义条件。
Beam包括如下复合触发器:
能够经过.withEarlyFirings和.withLateFirings向AfterWatermark.pastEndOfWindow添加额外的提早触发或延迟启动。
Repeatedly.forever指定一个永远重复执行的触发器。任何触发条件知足时,都会致使窗口发出结果,而后重置并从新开始。将Repeatedly.forever与.orFinally组合能够指定重复触发器中止的条件。
AfterEach.inOrder组合多个触发器以特定的顺序启动。每次序列中的触发器发出一个窗口,而后指向下一个触发器。
AfterFirst须要多个触发器,而且首次发出任何一个参数触发器都被知足。至关于多个触发器的逻辑OR运算。
AfterAll须要多个触发器,并在其全部参数触发器都知足时发出。至关于多个触发器的逻辑AND运算。
当Beam估计全部的数据已经到达时(即当水印经过窗口的末端)与如下二者或二者结合使用时,一些最有用的复合触发器触发一次:
Watermark超过窗口末尾以前的推测性触发,以容许更快地处理但有可能只发出部分结果(不是完整的)。
**Watermark超过窗口的末尾以后发生的延迟触发,以容许处理延迟数据
可使用AfterWatermark.pastEndOfWindow来表达此模式。 例如,如下示例代码表示在以下条件下触发:
•当Beam估计,全部的数据已经到达(Watermark超过窗口的末尾)时触发。 •通过10分钟延迟后,每一次延迟数据到达触发。 •2天后,咱们认为不再会有新数据到达,触发器中止执行。
.apply(Window
.configure()
.triggering(AfterWatermark
.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10)))) .withAllowedLateness(Duration.standardDays(2)));
触发器能够组合使用,构建其余类型的复合触发器。 如下示例代码显示了一个简单的复合触发器,每当窗格至少有100个元素或每1分钟触发1次。
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(100), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))