【赵强老师】Flink的Watermark机制(基于Flink 1.11.0实现)

在使用eventTime的时候如何处理乱序数据?咱们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分状况下,流到operator的数据都是按照事件产生的时间顺序来的,可是也不排除因为网络延迟等缘由,致使乱序的产生,特别是使用kafka的话,多个分区的数据没法保证有序。因此在进行window计算的时候,咱们又不能无限期的等下去,必需要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。Watermark是用于处理乱序事件的,用于衡量Event Time进展的机制。watermark能够翻译为水位线。java

1、Watermark的核心原理

Watermark的核心本质能够理解成一个延迟触发机制。
在 Flink 的窗口处理过程当中,若是肯定所有数据到达,就能够对 Window 的全部数据作 窗口计算操做(如汇总、分组等),若是数据没有所有到达,则继续等待该窗口中的数据全 部到达才开始处理。这种状况下就须要用到水位线(WaterMarks)机制,它可以衡量数据处 理进度(表达数据到达的完整性),保证事件数据(所有)到达 Flink 系统,或者在乱序及 延迟到达时,也可以像预期同样计算出正确而且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。apache

那么 Flink 是怎么计算 Watermak 的值呢?编程

Watermark =进入Flink 的最大的事件时间(mxtEventTime)-指定的延迟时间(t)windows

那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
若是有窗口的中止时间等于或者小于 maxEventTime - t(当时的warkmark),那么这个窗口被触发执行。api

其核心处理流程以下图所示。网络

2、Watermark的三种使用状况

一、原本有序的Stream中的 Watermark

若是数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺 序生成,此时水位线的变化和事件时间保持一直(由于既然是有序的时间,就不须要设置延迟了,那么t就是 0。因此 watermark=maxtime-0 = maxtime),也就是理想状态下的水位 线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是同样。这种状况实际上是乱序数据的一种特殊状况。app

二、乱序事件中的Watermark

现实状况下数据元素每每并非按照其产生顺序接入到 Flink 系统中进行处理,而频繁 出现乱序或迟到的状况,这种状况就须要使用 Watermarks 来应对。好比下图,设置延迟时间t为2。socket

三、并行数据流中的Watermark

在多并行度的状况下,Watermark 会有一个对齐机制,这个对齐机制会取全部 Channel 中最小的 Watermark。ide

3、设置Watermark的核心代码

一、首先,正确设置事件处理的时间语义,通常都是采用Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

二、其次,指定生成Watermark的机制,包括:延时处理的时间和EventTime对应的字段。以下:

注意:不论是数据是否有序,均可以使用上面的代码。有序的数据只是无序数据的一种特殊状况。函数

4、Watermark编程案例

测试数据:基站的手机通话数据,以下:

需求:按基站,每5秒统计通话时间最长的记录。

  • StationLog用于封装基站数据
package watermark;

//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
    private String stationID;   //基站ID
    private String from;        //呼叫放
    private String to;            //被叫方
    private long duration;        //通话的持续时间
    private long callTime;        //通话的呼叫时间
    public StationLog(String stationID, String from, 
                      String to, long duration, 
                      long callTime) {
        this.stationID = stationID;
        this.from = from;
        this.to = to;
        this.duration = duration;
        this.callTime = callTime;
    }
    public String getStationID() {
        return stationID;
    }
    public void setStationID(String stationID) {
        this.stationID = stationID;
    }
    public long getCallTime() {
        return callTime;
    }
    public void setCallTime(long callTime) {
        this.callTime = callTime;
    }
    public String getFrom() {
        return from;
    }
    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public long getDuration() {
        return duration;
    }
    public void setDuration(long duration) {
        this.duration = duration;
    }
}
  • 代码实现:WaterMarkDemo用于完成计算(注意:为了方便我们测试设置任务的并行度为1)
package watermark;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

//每隔五秒,将过去是10秒内,通话时间最长的通话日志输出。
public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        //获得Flink流式处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //设置周期性的产生水位线的时间间隔。当数据流很大的时候,若是每一个事件都产生水位线,会影响性能。
        env.getConfig().setAutoWatermarkInterval(100);//默认100毫秒
        
        //获得输入流
        DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
        stream.flatMap(new FlatMapFunction<String, StationLog>() {

            public void flatMap(String data, Collector<StationLog> output) throws Exception {
                String[] words = data.split(",");
                //                           基站ID            from    to        通话时长                                                    callTime
                output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
            }
        }).filter(new FilterFunction<StationLog>() {
            
            @Override
            public boolean filter(StationLog value) throws Exception {
                return value.getDuration() > 0?true:false;
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime对应的字段
                    }
                })
        ).keyBy(new KeySelector<StationLog, String>(){
            @Override
            public String getKey(StationLog value) throws Exception {
                return value.getStationID();  //按照基站分组
            }}
        ).timeWindow(Time.seconds(5)) //设置时间窗口
        .reduce(new MyReduceFunction(),new MyProcessWindows()).print();

        env.execute();
    }
}
//用于如何处理窗口中的数据,即:找到窗口内通话时间最长的记录。
class MyReduceFunction implements ReduceFunction<StationLog> {
    @Override
    public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
        // 找到通话时间最长的通话记录
        return value1.getDuration() >= value2.getDuration() ? value1 : value2;
    }
}
//窗口处理完成后,输出的结果是什么
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
    @Override
    public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
            Iterable<StationLog> elements, Collector<String> out) throws Exception {
        StationLog maxLog = elements.iterator().next();

        StringBuffer sb = new StringBuffer();
        sb.append("窗口范围是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
        sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
          .append("呼叫时间:").append(maxLog.getCallTime()).append("\t")
          .append("主叫号码:").append(maxLog.getFrom()).append("\t")
          .append("被叫号码:")    .append(maxLog.getTo()).append("\t")
          .append("通话时长:").append(maxLog.getDuration()).append("\n");
        out.collect(sb.toString());
    }
}

相关文章
相关标签/搜索