Flink Join

一.简介

Flink DataStream API中内置有两个能够根据实际条件对数据流进行Join算子:基于间隔的Join和基于窗口的Join。java

语义注意事项apache

  • 建立两个流元素的成对组合的行为相似内链接,若是来自一个流的元素与另外一个流没有相对应要链接的元素,则不会发出该元素。
  • 结合在一块儿的那些元素将其时间戳设置为位于各自窗口中最大时间戳。例如:以[5,10]为边界的窗口将产生链接的元素的时间戳为9。

二.窗口Join

2.1 翻滚窗口(Tumbling Window Join)

执行滚动窗口链接(Tumbling Window Join)时,具备公共Key和公共Tumbling Window的全部元素都以成对组合形式进行链接,并传递给JoinFunction或FlatJoinFunction。由于这就像一个内链接,在滚动窗口中没有来自另外一个流的元素的流的元素不会被输出。api

图片

如图所示,咱们定义了一个大小为2毫秒的滚动窗口,其结果为[0,1],[2,3], …。该图像显示了每一个窗口中全部元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发出任何内容,由于在绿色流中没有元素与橙色元素⑥、⑦链接。微信

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.2 滑动窗口Join(Sliding Window Join)

在执行滑动窗口链接(Sliding Window Join)时,具备公共Key和公共滑动窗口(Sliding Window )的全部元素都做为成对组合进行链接,并传递给JoinFunction或FlatJoinFunction。当前滑动窗口中没有来自另外一个流的元素的流的元素不会被发出。app

注意,有些元素可能会在一个滑动窗口中链接,但不会在另外一个窗口中链接!socket

图片

在本例中,咱们使用的滑动窗口大小为2毫秒,滑动1毫秒,滑动窗口结果[1,0],[0,1],[1,2],[二、3],… x轴如下是每一个滑动窗口的Join结果将被传递给JoinFunction的元素。在这里你还能够看到橙②与绿色③窗口Join(二、3),但不与任何窗口Join[1,2]。ide

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.3 会话窗口Join(Session Window Join)

在执行会话窗口链接时,具备相同键的全部元素(当“组合”时知足会话条件)都以成对的组合进行链接,并传递给JoinFunction或FlatJoinFunction。再次执行内部链接,所以若是会话窗口只包含来自一个流的元素,则不会发出任何输出。测试

图片

在这里,定义一个会话窗口链接,其中每一个会话被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的链接元素被传递给JoinFunction。在第三次会话中绿色流没有元素,因此⑧⑨不会Join。大数据

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.4.小结

除了对窗口中两条流进行Join,你还能够对它们进行Cogroup,只需将算子定义开始位置的Join()改成coGroup()便可,Join和Cogroup的整体逻辑相同。spa

两者区别:Join会为两侧输入中每一个事件对调用JoinFunction;而Cogroup中CoGroupFunction会以两个输入的元素遍历器为参数,只在每一个窗口中被调用一次。

三.间隔Join

interval join用一个公共Key链接两个流的元素(将它们称为A & B),其中流B的元素的时间戳具备相对于流A中的元素的时间戳。 这也能够更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B中共享一个公钥的元素。下界和上界均可以是负的或正的,只要下界小于或等于上界。interval链接目前只执行内部链接。

当将一对元素传递给ProcessJoinFunction时,它们将给两个元素分配更大的时间戳(能够经过ProcessJoinFunction.Context访问)。

注意:间隔链接目前只支持事件时间。
图片

在上面的示例中,咱们将“橙色”和“绿色”两个流链接起来,它们的下界为-2毫秒,上界为+1毫秒。默认状况下,这些是包含边界的,可是能够经过.lowerboundexclusive()和. upperboundexclusive()进行设置。

再用更正式的符号来表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] { 
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = { 
         out.collect(left + "," + right); 
        }
      });
    });

四.示例

4.1 间隔Join

package com.lm.flink.datastream.join
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Classname IntervalJoin * @Description TODO * @Date 2020/10/27 20:32 * @Created by limeng * 区间关联当前仅支持EventTime * Interval JOIN 相对于UnBounded的双流JOIN来讲是Bounded JOIN。就是每条流的每一条数据会与另外一条流上的不一样时间区域的数据进行JOIN。 */
object IntervalJoin { 
  def main(args: Array[String]): Unit = { 
    //设置至少一次或仅此一次语义
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置至少一次或仅此一次语义
    env.enableCheckpointing(20000,CheckpointingMode.EXACTLY_ONCE)
    //设置
    env.getCheckpointConfig
      .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,50000))
    env.setParallelism(1)
    val dataStream1 = env.socketTextStream("localhost",9999)
    val dataStream2 = env.socketTextStream("localhost",9998)
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1.map(f=>{ 
      val tokens = f.split(",")
      StockTransaction(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction]{ 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = { 
        val timestamp  = element.txTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2.map(f=>{ 
      val tokens = f.split(",")
      StockSnapshot(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot]{ 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = { 
        val timestamp  = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })
    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")
    dataStreamMap1.keyBy(_.txCode)
      .intervalJoin(dataStreamMap2.keyBy(_.mdCode))
      .between(Time.minutes(-10),Time.seconds(0))
      .process(new ProcessJoinFunction[StockTransaction,StockSnapshot,String] { 
        override def processElement(left: StockTransaction, right: StockSnapshot, ctx: ProcessJoinFunction[StockTransaction, StockSnapshot, String]#Context, out: Collector[String]): Unit = { 
          out.collect(left.toString +" =Interval Join=> "+right.toString)
        }
      }).print()

    env.execute("IntervalJoin")
  }
  case class StockTransaction(txTime:String,txCode:String,txValue:Double) extends Serializable{ 
    override def toString: String = txTime +"#"+txCode+"#"+txValue
  }
  case class StockSnapshot(mdTime:String,mdCode:String,mdValue:Double) extends Serializable { 
    override def toString: String = mdTime +"#"+mdCode+"#"+mdValue
  }
}

结果

get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap1> 1603708942#000001#10.4
get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap2> 1603708942#000001#10.4
1603708942#000001#10.4 =Interval Join=> 1603708942#000001#10.4

4.2 窗口Join

package com.lm.flink.datastream.join
import java.lang
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Classname InnerLeftRightJoinTest * @Description TODO * @Date 2020/10/26 17:22 * @Created by limeng * window join */
object InnerLeftRightJoinTest { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //每9秒发出一个watermark
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(9000)

    val dataStream1 = env.socketTextStream("localhost", 9999)
    val dataStream2 = env.socketTextStream("localhost", 9998)

    /** * operator操做 * 数据格式: * tx: 2020/10/26 18:42:22,000002,10.2 * md: 2020/10/26 18:42:22,000002,10.2 * * 这里因为是测试,固水位线采用升序(即数据的Event Time 自己是升序输入) */
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1
      .map(f => { 
        val tokens = f.split(",")
        StockTransaction(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] { 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = { 
        val timestamp = element.txTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2
      .map(f => { 
        val tokens = f.split(",")
        StockSnapshot(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot] { 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = { 
        val timestamp = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")

    /** * Join操做 * 限定范围是3秒钟的Event Time窗口 */
    val joinedStream = dataStreamMap1.coGroup(dataStreamMap2)
      .where(_.txCode)
      .equalTo(_.mdCode)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
    val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
    val rightJoinedStream = joinedStream.apply(new RightJoinFunction)
    innerJoinedStream.name("InnerJoinedStream").print()
    leftJoinedStream.name("LeftJoinedStream").print()
    rightJoinedStream.name("RightJoinedStream").print()
    env.execute("InnerLeftRightJoinTest")
  }

  class InnerJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(first: lang.Iterable[StockTransaction], second: lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      import scala.collection.JavaConverters._
      val scalaT1 = first.asScala.toList
      val scalaT2 = second.asScala.toList

      println(scalaT1.size)
      println(scalaT2.size)
      /** * Inner join 要比较的是同一个key下,同一个时间窗口内 */
      if (scalaT1.nonEmpty && scalaT2.nonEmpty) { 
        for (transaction <- scalaT1) { 
          for (snapshot <- scalaT2) { 
            out.collect(transaction.txCode, transaction.txTime, snapshot.mdTime, transaction.txValue, snapshot.mdValue, "Inner Join Test")
          }
        }
      }
    }
  }
  class LeftJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      /** * 将Java中的Iterable对象转换为Scala的Iterable * scala的集合操做效率高,简洁 */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Left Join要比较的是同一个key下,同一个时间窗口内的数据 */
      if (scalaT1.nonEmpty && scalaT2.isEmpty) { 
        for (transaction <- scalaT1) { 
          out.collect(transaction.txCode, transaction.txTime, "", transaction.txValue, 0, "Left Join Test")
        }
      }
    }
  }
  class RightJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      /** * 将Java中的Iterable对象转换为Scala的Iterable * scala的集合操做效率高,简洁 */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Right Join要比较的是同一个key下,同一个时间窗口内的数据 */
      if (scalaT1.isEmpty && scalaT2.nonEmpty) { 
        for (snapshot <- scalaT2) { 
          out.collect(snapshot.mdCode, "", snapshot.mdTime, 0, snapshot.mdValue, "Right Join Test")
        }
      }
    }
  }

  case class StockTransaction(txTime: String, txCode: String, txValue: Double)
  case class StockSnapshot(mdTime: String, mdCode: String, mdValue: Double)
}

参考

https://www.jianshu.com/p/ba19e4d1d802

公众号

在这里插入图片描述 名称:大数据计算 微信号:bigdata_limeng

相关文章
相关标签/搜索