Flink中的Time与Window

1、Timejava

在Flink的流式处理中,会涉及到时间的不一样概念web

Event Time(事件时间):是事件建立的时间。它一般由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录本身的生成时间,Flink经过时间戳分配器访问事件时间戳apache

Ingestion Time(采集时间):是数据进入Flink的时间windows

Processing Time(处理时间):是每个执行基于时间操做的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。api

例如一条日志进入Flink的时间为2017-11-12 10:00:00.123 到达window的系统时间为 2017-11-12 10:00:01.234,日志内容以下:网络

2017-11-02 18:37:15.624 INFO Fair over to rm2session

对于业务来讲,要统计1min内的故障日志个数,哪一个时间是最有意义的?----- eventTime,由于咱们要根据日志的生成时间进行统计。socket

  

若是要想聚合,不可能对无解数据流进行聚合。ide

 

2、Windowspa

一、streaming流式计算是一种被设计用于处理处理无限数据集的数据处理引擎,而无限数据集是指一种不断增加的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分红有限大小的"buckets"桶,咱们能够在这些桶上作计算操做。

共有两类,五种时间窗口。

二、Window类型(两类)

2.一、CountWindow:按照指定的数据条数生成一个window,与时间无关

2.二、TimeWindow:按照时间生成window。(按照Processing Time来划分Window)

对于TimeWindow和CountWindow,能够根据窗口实现原理的不一样分红三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

(1)滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切分。

特色:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每一个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,而且不会出现重叠。

(2)滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特色:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口相似,窗口的大小由窗口大小参数来配置,另外一个窗口滑动参数控制滑动窗口开始的频率。

所以,滑动窗口若是滑动参数小于窗口大小的话,窗口是能够重叠的,在这种状况下元素会被分配到多个窗口中。

使用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警。)

(3)会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成。相似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特色:时间无对齐。

session 窗口分配器经过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的状况,相反,当它在一个固定的

时间周期内再也不收到元素,即非活动间隔产生,那这个窗口就会关闭。一个Session窗口经过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃

周期产生,那么当前的session将关闭而且后续的元素将被分配到新的session窗口中去。

 

3、Window API

3.一、CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的全部元素的总数。

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滚动窗口(Tumbling Windows) * 将数据依据固定的窗口长度对数据进行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的全部元素的总数。
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }

3.2

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滑动窗口(Sliding Windows) * 将数据依据固定的窗口长度对数据进行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的个数,不是输入的全部元素的总数。 //知足步长,就执行一次,按第一个参数的长度
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }

4、EventTime与Window

一、EventTime的引入

在Flink的流式处理中,绝大部分的业务都会使用eventTime,通常只在eventTime没法使用时,才会被迫使用ProcessingTime或者IngestionTime。

若是要使用EventTime,那么须要引入EventTime的时间戳,引入方式以下所示:

二、Watermark

  概念:咱们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分状况下,流到operator的数据都是按照事件产生的

事件戳顺序来的,可是也不排除因为网络、背压等缘由,致使乱序的产生,所谓乱序,就是指Flink接收到的事件的前后顺序不是严格按照事件的EventTime顺序排列的。

  Watermark是一种衡量Event Time进展的机制,它是数据自己的一个隐藏属性,数据自己携带着对应的Watermark。

  Watermark是用于处理乱序事件的,而正确的处理乱序事件,一般用Watermark机制结合window来实现。

  数据流中的Watermark用于表示eventTime小于Watermark的数量,都已经到达了,所以,window的执行也是由Watermark触发的。

  Watermark能够理解成一个延迟触发机制。咱们能够设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,而后认定eventTime 小于

maxEventTime-t 的全部数据都已经到达。若是有窗口的中止时间等于maxEventTime-t,那么这个窗口被触发执行。

滚动窗口/滑动窗口/会话窗口

 
  
import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow/**  * TimeWindow  */object EventTimeAndWindow {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //开启watermark    //从调用时刻开始给env建立的每个stream追加时间特征。    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(      new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {        override def extractTimestamp(element: String): Long = {          // event word  eventTime是日志生成时间,咱们从日志中解析EventTime          val eventTime = element.split(" ")(0).toLong          println(eventTime)          eventTime        }      }    ).map(item => (item.split(" ")(1),1L)).keyBy(0)    //加上滚动窗口,窗口大小是5s,调用window的api//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))    //滑动窗口//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))    //会话窗口    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))    val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))    streamReduce.print()        env.execute("EventTimeAndWindow")  }}
相关文章
相关标签/搜索