Flink 系列博客
Flink QuickStart
Flink双流操做
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操做
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictorjava
简介
Flink 双数据流转换为单数据流操做的运算有cogroup
, join
和coflatmap
。下面为你们对比介绍下这3个运算的功能和用法。docker
Join
:只输出条件匹配的元素对。CoGroup
: 除了输出匹配的元素对之外,未能匹配的元素也会输出。CoFlatMap
:没有匹配条件,不进行匹配,分别处理两个流的元素。在此基础上彻底能够实现join和cogroup的功能,比他们使用上更加自由。
对于join和cogroup来讲,代码结构大体以下:session
val stream1 = ... val stream2 = ... stream1.join(stream2) .where(_._1).equalTo(_._1) //join的条件stream1中的某个字段和stream2中的字段值相等 .window(...) // 指定window,stream1和stream2中的数据会进入到该window中。只有该window中的数据才会被后续操做join .apply((t1, t2, out: Collector[String]) => { out.collect(...) // 捕获到匹配的数据t1和t2,在这里能够进行组装等操做 }) .print()
下面咱们以实际例子来讲明这些运算的功能和用法。app
Join操做
Flink中的Join操做相似于SQL中的join,按照必定条件分别取出两个流中匹配的元素,返回给下游处理。
示例代码以下:socket
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.socketTextStream("127.0.0.1", 9000).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 1 val stream2 = env.socketTextStream("127.0.0.1", 9001).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 2 stream1.join(stream2) .where(_._1).equalTo(_._1) // 3 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) // 4 .trigger(CountTrigger.of(1)) // 5 .apply((t1, t2, out: Collector[String]) => { out.collect(t1._2 + "<=>" + t2._2) // 6 }) .print() env.execute("Join Demo")
代码中有些部分须要解释,以下:ide
- 建立一个socket stream。本机9000端口。输入的字符串以空格为界分割成Array[String]。而后再取出其中前两个元素组成(String, String)类型的tuple。
- 同上。端口变为9001。
- join条件为两个流中的数据(
(String, String)
类型)第一个元素相同。- 为测试方便,这里使用session window。只有两个元素到来时间先后相差不大于30秒之时才会被匹配。(Session window的特色为,没有固定的开始和结束时间,只要两个元素之间的时间间隔不大于设定值,就会分配到同一个window中,不然后来的元素会进入新的window)。
- 将window默认的trigger修改成count trigger。这里的含义为每到来一个元素,都会马上触发计算。
- 处理匹配到的两个数据,例如到来的数据为(1, "a")和(1, "b"),输出到下游则为"a<=>b"
下面咱们测试下程序。测试
打开两个terminal,分别输入 nc -lk 127.0.0.1 9000
和 nc -lk 127.0.0.1 9001
。ui
在terminal1中输入,1 a
,而后在terminal2中输入2 b
。观察程序console,发现没有输出。这两条数据不知足匹配条件,所以没有输出。url
在30秒以内输入1 c
,发现程序控制台输出告终果a<=>c
。再输入1 d
,控制台输出a<=>c
和a<=>d
两个结果。.net
等待30秒以后,在terminal2中输入1 e
,发现控制台无输出。因为session window的效果,该数据和以前stream1中的数据不在同一个window中。所以没有匹配结果,控制台不会有输出。
综上咱们得出结论:
- join只返回匹配到的数据对。若在window中没有可以与之匹配的数据,则不会有输出。
- join会输出window中全部的匹配数据对。
- 不在window内的数据不会被匹配到。
CoGroup操做
因为测试代码基本相同,直接贴出代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.socketTextStream("127.0.0.1", 9000).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 1 val stream2 = env.socketTextStream("127.0.0.1", 9001).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 2 stream1.coGroup(stream2) .where(_._1).equalTo(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) .trigger(CountTrigger.of(1)) .apply((t1, t2, out: Collector[String]) => { val stringBuilder = new StringBuilder("Data in stream1: \n") for (i1 <- t1) { stringBuilder.append(i1._1 + "<=>" + i1._2 + "\n") } stringBuilder.append("Data in stream2: \n") for (i2 <- t2) { stringBuilder.append(i2._1 + "<=>" + i2._2 + "\n") } out.collect(stringBuilder.toString) }) .print() env.execute()
通过一样的测试咱们得出结论:
CoGroup的做用和join基本相同,但有一点不同的是,若是未能找到新到来的数据与另外一个流在window中存在的匹配数据,仍会将其输出。
CoFlatMap操做
相比之下CoFlatMap操做就比以上两个简单多了。CoFlatMap操做主要在CoFlatMapFunction中进行。
如下是CoFlatMapFunction的代码:
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { /** * This method is called for each element in the first of the connected streams. * * @param value The stream element * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ void flatMap1(IN1 value, Collector<OUT> out) throws Exception; /** * This method is called for each element in the second of the connected streams. * * @param value The stream element * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ void flatMap2(IN2 value, Collector<OUT> out) throws Exception; }
简单理解就是当stream1数据到来时,会调用flatMap1方法,stream2收到数据之时,会调用flatMap2方法。
stream1.connect(stream2).flatMap(new CoFlatMapFunction[(String, String), (String, String), String] { override def flatMap1(value: (String, String), out: Collector[String]): Unit = { println("stream1 value: " + value) } override def flatMap2(value: (String, String), out: Collector[String]): Unit = { println("stream2 value: " + value) } }).print()
因为结果不难验证,这里就不在赘述验证过程了。
总结
Join、CoGroup和CoFlatMap这三个运算符都可以将双数据流转换为单个数据流。Join和CoGroup会根据指定的条件进行数据配对操做,不一样的是Join只输出匹配成功的数据对,CoGroup不管是否有匹配都会输出。CoFlatMap没有匹配操做,只是分别去接收两个流的输入。你们能够根据具体的业务需求,选择不一样的双流操做。