Flink Program Guide (2) -- 综述 (DataStream API编程指导 -- For Java)

 

 

DataStream API编程指导html

文档翻译自Flink DataStream API Programming Guidejava

-----------------------------------------------------------------------web

Flink中的DataStream程序是实如今数据流上的transformation(filteringupdating state defining windowsaggregating)的普通程序。建立数据流的来源多种多样(如消息队列,socket流,文件等)。程序经过data sink返回结果,如将数据写入文件,或发送到标准输出(如命令行终端)。Flink程序能够在多种上下文中运行,如独立运行或是嵌入在其余程序中执行。程序的执行能够发生在本地JVM,或者在一个拥有许多设备的集群上。算法

 

有关介绍Flink API基础概念的文档,请见basic conceptsapache

 

为了建立你本身的Flink DataStream程序,咱们鼓励你从文档anatomy of a Flink Program开始,且欢迎你添加本身的transformations。该文档接下来的部分是额外的operation和进阶特性的参考文档。编程

1、示例程序

下面的程序是一个完整的流式窗口word count应用,它计算出在web socket的大小为5秒的窗口中的出现各个单词的数量。你能够复制 & 粘贴代码并在本地运行。windows

import org.apache.flink.api.common.functions.FlatMapFunction;
import
org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.time.Time;
import
org.apache.flink.util.Collector;api

public class WindowWordCount {缓存

public static void main(String[] args) throws Exception {性能优化

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
  .
socketTextStream("localhost", 9999)
  .
flatMap(new Splitter())
  .
keyBy(0)
  .
timeWindow(Time.seconds(5))
  .
sum(1);

dataStream.print();

env.execute("Window WordCount");

}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override

public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {

for (String word: sentence.split(" ")) {

out.collect(new Tuple2<String, Integer>(word, 1));

}

}

}

}

要运行该示例程序,首先从终端运行netcat来开始输入流

nc -lk 9999

 

仅须要输入一些单词,这些将是word count程序的输入数据。若是你想看到count大于1的结果,在5秒内重复输入同一个单词。

 

2、DataStream Transformations

Data transformation会将一或多个DataStream转换成一个新的DataStream。程序能够将多个transformation结合造成复杂的拓扑结构(topology)。

 

本小节给出了全部可用的transformation的描述。

Transformation

描述

Map

DataStream -> DataStream

获取一个element并产出一个element。下例是一个将输入*2map方法:
 

DataStream<Integer> dataStream = //...
dataStream.
map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer
map(Integer value) throws Exception {
    return
2 * value;
  }
});

FlapMap

DataStream -> DataStream

获取一个element,并产生出01或多个element。下例是一个为句子分词的flatmap方法

 

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public
void flatMap(String value, Collector<String> outthrows Exception {
    for(String word: value.
split(" ")){
    out.
collect(word);
    }
  }
});

Filter

DataStream -> DataStream

在每一个获取的element上运行一个boolean方法,留下那些方法返回trueelement。下例是一个过滤掉0值的filter
 

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value !=
0;
  }
});

KeyBy
DataStream -> KeyedStream

将流逻辑分为不相交的分区,每一个分区包含的都是具备相同keyelement,该分区方法使用hash分区实现。定义key的方法见于Keys。下例是一个返回KeyedDataStreamtransformation
 

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.
keyBy(0) // Key by the first element of a Tuple

Reduce

KeyedStream -> DataStream

一个在keyed data stream滚动进行的reduce方法。将上一个reduce过的值和当前element结合,产生新的值并发送出。下例是一个建立部分和的reduce方法。
 

keyedStream.reduce(new ReduceFunction<Integer>() {
  @Override
  public Integer
reduce(Integer value1, Integer value2throws Exception {
    return value1 + value2;
  }
});

Fold

KeyedStream -> DataStream

一个在带有初始值的数据流上滚动进行的fold方法。将上一个fold的值和当前element结合,产生新的值并发送出。下例是一个fold方法,当应用于序列{1, 2, 3, 4, 5}时,它发出序列{"start-1", "start-1-2", "start-1-2-3" …}
 

DataStream<String> result keyedStream.fold("start", new FoldFunction<Integer, String>() {
  @Override
  public String
fold(String current, Integer value) {
    return current +
"-" + value;
  }
});

Aggregations

KeyedStream -> DataStream

在一个keyed DataStream滚动进行聚合的方法。其中,minminBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的elementmaxmaxBy同样如此)。
 

keyedStream.sum(0);
keyedStream.
sum("key");
keyedStream.
min(0);
keyedStream.
min("key");
keyedStream.
max(0);
keyedStream.
max("key");
keyedStream.
minBy(0);
keyedStream.
minBy("key");
keyedStream.
maxBy(0);
keyedStream.
maxBy("key");

Window

KeyedStream - > WindowedStream

Window能够定义在已经分区的KeyedStream上。窗口将根据一些特征(如最近5秒到达的数据)将数据按其各自的key集合在一块儿。有关窗口的完整描述见于windows
 

// Last 5 seconds of data

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

WindowAll

DataStream -> AllWindowedStream

Window能够定义在普通的DataStream上。窗口将根据一些特征(如最近5秒到达的数据)将全部Stream事件集合在一块儿。有关窗口的完整描述见于windows
警告:该transformation在不少状况下都不是并行化的,全部数据将被收集到一个运行windowAll Operator的任务上。

 

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

WindowedStream -> DataStream

AllWindowedStream -> DataStream

将一个通常函数应用到window总体上去,下面是一我的工计算window中全部element的总和的应用。
注意:若是你正在使用一个windowAlltransformation,你须要使用AllWindowFunction来代替下例中的参数。
 

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public
void apply (Tuple tuple,
  Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    
int sum = 0;
    for (value t: values) {
      sum += t.
f1;
    }
    out.
collect (new Integer(sum));
  }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.
apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public
void apply (Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    
int sum = 0;
    for (value t: values) {
      sum += t.
f1;
    }
    out.
collect (new Integer(sum));
  }
});

Window Reduce

WindowedStream -> DataStream

对窗口应用一个功能性reduce方法并返回reduce的结果
 

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
  public Tuple2<String, Integer>
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
  return new Tuple2<String,Integer>(value1.
f0, value1.f1 + value2.f1);
  }
};

Window Fold

Windowed Stream -> DataStream

对窗口应用一个功能性fold方法。下例代码在应用到序列(1, 2, 3, 4, 5)时,它将该序列fold成为字符串"start-1-2-3-4-5"
 

windowedStream.fold("start-", new FoldFunction<Integer, String>() {
  public String
fold(String current, Integer value) {
    return current +
"-" + value;
  }
};

Aggregations on windows

WindowedStream -> DataStream

对窗口中的内容聚合。其中,minminBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的elementmaxmaxBy同样如此)。
 

windowedStream.sum(0);
windowedStream.
sum("key");
windowedStream.
min(0);
windowedStream.
min("key");
windowedStream.
max(0);
windowedStream.
max("key");
windowedStream.
minBy(0);
windowedStream.
minBy("key");
windowedStream.
maxBy(0);
windowedStream.
maxBy("key");

Union

DataStream* -> DataStream

2个或多个data stream合并建立出一个新的包含全部streamelementstream。注意:若是你对一个data stream本身进行union操做,则在返回的结果中,每一个element都会出现2个。
 

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream, DataStream -> DataStream

在给定key和普通window中,将2DataStream进行Join操做

 

dataStream.join(otherStream)
.
where(0).equalTo(1)
.
window(TumblingEventTimeWindows.of(Time.seconds(3)))
.
apply (new JoinFunction () {...});

Window CoGroup

DataStream, DataStream -> DataStream

在给定key和普通window中,对2DataStream进行CoGroup操做。
 

dataStream.coGroup(otherStream)
.
where(0).equalTo(1)
.
window(TumblingEventTimeWindows.of(Time.seconds(3)))
.
apply (new CoGroupFunction () {...});

Connect

DataStream, DataStream -> ConnectedStreams

在保留两个DataStream的类型的状况下,将两者"链接"起来。Connect使咱们能够共享两个Stream的状态
 

DataStream<Integer> someStream = //...
DataStream<String> otherStream =
//...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStreams -> DataStream

该操做相似于mapflatMap针对链接的Data Stream版本。Sd
 

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
  @Override
  public Boolean
map1(Integer value) {
    return true;
  }

  @Override
  public Boolean
map2(String value) {
    return false;
  }
});

 

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

  @Override
  public
void flatMap1(Integer value, Collector<String> out) {
    out.
collect(value.toString());
  }

  @Override
  public
void flatMap2(String value, Collector<String> out) {
    for (String word: value.
split(" ")) {
      out.
collect(word);
    }
  }
});

Split

DataStream -> SplitStream

根据某些标准将Stream分割成2个或更多的stream
 

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
  @Override
  public Iterable<String>
select(Integer value) {
    List<String> output = new ArrayList<String>();
    if (value %
2 == 0) {
      output.
add("even");
    }
    else {
      output.
add("odd");
    }
    return output;
  }
});

Select

SplitStream -> DataStream

SplitStream中选择1个或多个stream

 

SplitStream<Integer> split;
DataStream<Integer> even = split.
select("even");
DataStream<Integer> odd = split.
select("odd");
DataStream<Integer> all = split.
select("even","odd");

Iterate

DataStream -> IterativeStream -> DataStream

经过将一个Operator的输出重定向到前面的某个Operator的方法,在数据流图中建立一个反馈循环。这在定义持续更新模型的算法时十分有用。下面的例子从一个Stream开始,并持续应用迭代体(Iteration body)。大于0element被送回到反馈通道,而其余的element则被转发到下游。相关完整描述请见Iterations
 

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.
map (/*do something*/);
DataStream<Long> feedback = iterationBody.
filter(new FilterFunction<Long>(){
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value >
0;
  }
});
iteration.
closeWith(feedback);
DataStream<Long> output = iterationBody.
filter(new FilterFunction<Long>(){
  @Override
  public
boolean filter(Integer value) throws Exception {
    return value <=
0;
  }
});

Extract Timestamps

DataStream -> DataStream

经过从数据中抽取时间戳来使得经过使用事件时间语义的窗口能够工做。详情见于Event Time
 

stream.assignTimestamps (new TimeStampExtractor() {...});

 

接下来的Transformation是对Tuple类型的data stream可用的Transformation

Transformation

描述

Project

DataStream -> DataStream

tuple中选择出域的子集而产生新的DataStream
 

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.
project(2,0);

 

物理级分割(Physical Partitioning

若是须要,Flink一样提供了在进行一次transformation后针对精确stream分割的低层次的控制(low-level control),它们经过如下几个方法实现。

 

Transformations

描述

Custom partitioning

DataStream -> DataStream

使用一个用户自定义的Partitioner来对每一个element选择目标任务sd
 

dataStream.partitionCustom(partitioner, "someKey");
dataStream.
partitionCustom(partitioner, 0);

Random partitioning

DataStream -> DataStream

根据均匀分布来随机分割element
 

dataStream.shuffle();

Rebalancing(轮询分割)

DataStream -> DataStream

轮询分割element,建立相同负荷的分割。对数据变形(data skew)时的性能优化十分有用s
 

dataStream.rebalance();

Rescaling

DataStream -> DataStream

element轮询分割到下游Operator子集中去。这在你想流水线并行时十分有用,例如,须要从每一个并行的source实例中将数据fan out到一个有着一些mapper来分发负载,可是又不想要函数rebalance()那样引发的彻底rebalance的效果时。这就须要仅在本地传输数据,而不是须要从网络传输,这须要依赖其余诸如TaskManager的任务槽数量等等configuration值。
上游Operation发送element的下游Operation子集同时依赖于上游和下游两方Operation的并行度。例如,若上游Operation的并行度为2,下游Operation并行度为4,则1个上游Operation将会把它的element分发给2个下游Operation。另外一方面,若下游并行度为2而上游并行度为4,则2个上游Operation将会把它们的element分发给1个下游Operation,而另外两个上游Operation则分发给另外一个下游Operation
当一个或是多个上下游Operation的并行度不是倍数关系时,下游的Operation将拥有不一样的从上游得到的输入的数量。
下图是上面例子的链接模式图:
 

dataStream.rescale();

Broadcasting

DataStream -> DataStream

element广播到每个分割中去
 

dataStream.broadcast();

 

连接任务以及资源组(Task chaining & resource groups

将两个transformation连接起来意味着将它们部署在一块儿(co-locating),共享同一个线程来得到更好的性能。Flink默认地尽量地连接Operator(如两个连续的map transformation)。若有须要,API还给出了细粒度的连接控制:

 

使用StreamExecutionEnvironment.disableOperatorChaining()来关闭整个Job的连接操做。下面表格中的方法则是更加细粒度的控制函数,注意,因为这些函数引用的是前一个transformation,因此它们仅仅在一个DataStreamtransformation后使用才是正确的,例如someStream.map( … ).startNewChain()是正确的,而someStream.startNewChain()错误的。

 

一个资源组就是Flink中的一个任务槽,若有须要,你能够人工孤立某个Operator到一个独立的任务槽中。

Transformation

描述

startNewChain()

以当前Operator起点,开始一个新的连接。在下例中,两个mapper将会被连接而filter则不会与第一个mapper连接
 

someStream.filter(...).map(...).startNewChain().map(...);

disableChaining()

下例中,将不会连接mapOperator
 

someStream.map(...).disableChaining();

slotSharingGroup()

设置一个Operation的共享任务槽的分组。Flink将会把同一个任务槽共享组的Operation放到同一个任务槽中,而不在同一个任务槽共享组的Operation放到其余任务槽中。这能够用来孤立任务槽。若是全部的输入Operation都在同一个任务槽共享组中,则该任务槽共享组会继承下来。任务槽共享组的默认名为"default"Operation能够经过调用slotSharingGroup("default")来定义其名称。
 

someStream.filter(...).slotSharingGroup("name");

 

3、数据源

数据源能够经过StreamExecutionEnvironment.addSource(sourceFunction)来建立数据源。你可使用Flink提供的source方法,也能够经过实现SourceFunction来编写自定义的非并行数据源,也能够经过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来编写自定义并行数据源。

如下是几个预约义的数据流源,能够经过StreamExecutionEnvironment来访问:

1.    基于文件的:

·        readTextFile(path) / TextInputFormat - 以行读取方式读文件并返回字符串

·        readFile(path) / 任意输入格式 - 按用输入格式的描述读取文件

·        readFileStream - 建立一个stream,在文件有改动时追加element

2.    基于Socket的:

·        socketTextStream - socket读取,element能够经过分割符来分开

3.    基于Collection的:

·        fromCollection(Collection) - Java.util.Collection建立一个数据流。collection中全部的element都必须是同一类型的。

·        fromCollection(Iterator, Class) - 从一个迭代器中建立一个数据流。class参数明确了迭代器返回的element的类型。

·        fromElement(T …) - 从一个给定的对象序列建立一个数据流。全部对象都必须是同一类型的。

·        fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中建立一个并行数据流。class参数明确了迭代器返回的element的类型。

·        generateSequence(from, to) - 从一个给定区间中生成一个并行数字序列。

4.    自定义:

·        addSource - 附上一个新的source方法。例如,经过调用addSource(new FlinkKafkaConsumer08<>(…))来从Apache Kafka读取数据,更多信息见于connector

 

4、Data Sink

Data Sink消耗DataStream并将它们转发到文件、socket、外部系统或打印它们。Flink自带了许多内置的输出格式,封装为DataStreamoperation中:

·        writeAsText() / TextOutputFormat - 以行字符串的方式写文件,字符串经过调用每一个elementtoString()方法得到。

·        writeAsCsv(…) / CsvOutputFormat - 以逗号分隔的值来说Tuple写入文件,行和域的分隔符是能够配置的。每一个域的值是经过调用objecttoString()方法得到的。

·        print() / printToErr() - 将每一个elementtoString()值打印在标准输出 / 标准错误流中。能够提供一个前缀(msg)做为输出的前缀,使得在不一样print的调用能够互相区分。若是并行度大于1,输出也会以task的标识符(identifier)为产生的输出的前缀。

·        writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出所用的方法和基类,支持自定义objectbyte的转换。

·        writeToSocket - 依据SerializationSchemaelement写到socket中。

·        addSink - 调用自定义sink方法,Flink自带链接到其余系统的connector(如Apache Kafka),这些connector都以sink方法的形式实现。

 

注意DataStreamwrite*()函数主要用于debug,它们不参与Flink的检查点,这意味着这些方法一般处于至少一次(at-least-once的执行语义下。flush到目标系统的数据依赖于OutputFormat的实现,这意味着不是全部发送到OutputFormatelement都会当即出如今目标系统中,此外,在失效的状况下,这些数据极可能会丢失。

 

故为了可靠性以及将stream“刚好一次(exact once地传入文件系统,咱们应当使用flink-connector-filesystem。此外,经过实现.addSink(…)的自定义内容会参加Flink的检查点机制,故会保证刚好一次的执行语义。

 

5、迭代(Iterations

迭代流程序实现了一个阶段方法并将之嵌入到一个IterativeStream中。做为一个可能永远不会结束的程序,它没有最大迭代数,反之,你须要使用splitfiltertransformation来明确流的哪一部分会被反馈到迭代中,哪一部分则继续转发到下游。这里,咱们使用filter做为例子,咱们定义IterativeStream

IterativeStream<Integer> iteration = input.iterate();

而后,咱们定义在循环中将要进行的逻辑处理,咱们经过一系列transformation来实现(这里用了一个简单的map transformation):

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

 

咱们能够调用IterativeStreamcloseWith(feedbackStream)函数来关闭一个迭代并定义迭代尾。传递给closeWith方法的DataStream将会反馈回迭代头。分割出用来反馈的stream的部分和向前传播的stream部分一般的方法即是使用filter来进行分割。这些filter能够定义诸如"termination"逻辑,即element将会传播到下游,而不是被反馈回去。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.
filter(/* some other part of the stream */);

 

默认地,反馈的那部分流将会自动设置为迭代头的输入,要想重载该行为,用户须要设置closeWith函数中的一个boolean参数。例如,下面是一个持续将整数序列中的数字减1知道它们变为0的程序:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long
map(Long value) throws Exception {
    return value -
1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public
boolean filter(Long value) throws Exception {
    return (value >
0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public
boolean filter(Long value) throws Exception {
    return (value <=
0);
  }
});

 

6、执行参数

StreamExecutionEnvironment包含ExecutionConfig,它可使用户设置job的确切运行时配置值。

请参考execution configuration来查看参数的解释。特别的,如下这些参数仅适用于DataStream API

enableTimestamps() / disableTimestamps():在每个source发出的事件上附加上一个时间戳。函数areTimestampsEnabled()能够返回该状态的当前值。

setAutoWatermarkInterval(long milliseconds):设置自动水印发布(watermark emission)区间。你能够经过调用函数getAutoWatermarkInterval()来获取当前值。

 

6.1 容错

文档Fault Tolerance Documentation描述了打开并配置Flink的检查点机制的选项和参数

 

6.2 控制执行时间

默认的,element在网络传输时不是一个个单独传输的(这会致使没必要要的网络流量),而是缓存后传输。缓存(是在设备间传输的实际单位)的大小能够在Flink的配置文件中设置。尽管该方法有益于优化吞吐量,他会在stream到达不够快时致使执行时间方面的问题。为了控制吞吐量和执行时间,你能够在执行环境(或独立的Operator)中调用env.setBufferTimeout(timeoutMillis)来设置等待装满buffer的最大等待时间,在这个时间事后,无论buffer是否已满,它都会自动发出。该默认超时时间是100ms。下例是设置API的用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.
setBufferTimeout(timeoutMillis);

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

 

要最大化吞吐量,设置setBufferTimeout(-1)来去除超时时间,则buffer仅在它满后才会被flush。要最小化执行时间,设置timeout为一个接近0的数字(如5ms10ms)。应当避免设置Timeout0,由于它会形成严重的性能降低。

 

7、Debugging

在分布式集群上运行流程序以前,确保算法正确执行很重要。所以,实现数据分析程序一般须要递增的检查结果、debug、优化的过程。

Flink提供了能够显著简化数据分析程序的开发过程的特性,便可以在IDE中本地进行debug、注入测试数据、以及结果数据的收集等。本节对如何简化Flink程序开发提出几点建议。

 

7.1 本地执行环境

LocalStreamEnvironment在建立它的同一个JVM进程下建立Flink系统。若是你从IDE中启动一个LocalEnvironment,你能够在代码中设置断点来简单地debug你的程序。下例为LocalEnvironment是如何建立并使用的:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

 

7.2 Collection数据源

Flink提供基于Java collection的特殊数据源来方便测试。一旦程序测试以后,sourcesink能够简单地替代为对外部系统的读取/写出的sourcesinkCollection数据源使用方法以下:

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.
fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.
fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.
fromCollection(longIt, Long.class);

 

注意:当前Collection数据源须要实现Serializable接口的数据类型和迭代器。此外,Collection数据源没法并行执行(并行度=1

 

7.3 迭代器Data Sink

Flink一样提供了一个收集测试和debugDataStream结果的sink,它的使用方式以下:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.
collect(myResult)

相关文章
相关标签/搜索