Flink watermark

Flink中watermark主要解决保序问题. 而保序问题的根本缘由是多个任务同时从流中并行处理数据,顺序没法保证. html

上游: 生成watermark
通常在WINDOW 操做以前生成WATERMARK, WATERMARK 有两种:
AssignWithPeriodicWatermarks:
每隔N秒自动向流里注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定. 每次调用getCurrentWatermark 方法, 若是获得的WATERMARK 不为空而且比以前的大就注入流中 (emitWatermark)
参考 TimestampsAndPeriodicWatermarksOperator.processElement 前端

AssignWithPunctuatedWatermarks:
基于事件向流里注入一个WATERMARK,每个元素都有机会判断是否生成一个WATERMARK. 若是获得的WATERMARK 不为空而且比以前的大就注入流中 (emitWatermark)
参考 TimestampsAndPunctuatedWatermarksOperator.processElement算法

每次生成WATERMARK将覆盖流中已有的WATERMARK apache

下游: 处理watermark
StatusWatermarkValve 负责将不一样Channel 的Watermark 对齐,再传给pipeline 下游,对齐的概念是当前Channel的Watermark时间大于全部Channel最小的Watermark时间
Flink watermark并发

WindowOperator 的处理:
WindowOperator.processElementide

  1. WindowAssigner.assignWindows 为当前的消息分配滑动窗口
    经常使用的有: TumblingEventTimeWindows: 按照消息的 EventTime 分配窗口 (每次生成单个窗口)
    TumblingProcessingTimeWindows 按照当前的时间分配窗口 (每次生成单个窗口)
    须要配合StreamExecutionEnvironment.setStreamTimeCharacteristic 使用 (默认是TimeCharacteristic.ProcessingTime), 这个必须匹配
    不然没法正常触发滑动窗口

实际观察结果:测试

  • 若是使用ProcessingTimeWindows 即便Event 自己的时间落后于窗口时间不少也会被触发
  • 不管是否使用WATERMARK,窗口中的数据会有乱序,即后到窗口中的数据早于先到窗口中的数据
  • 若是使用EventTimeWindow, 数据和窗口时间对齐不会乱序,同一窗口中的数据不能严格保证顺序,须要SORT.
  • 最后一批数据有缺失,缺失的数据取决于WATERMARK的MAXOUTOFORDERNESS
  • 默认的WATERMARK算法是根据元素的最大时间决定的,当没有新的元素进入流中的时候,水位再也不上涨,再减去MAXOUTOFORDERNESS, 则最后一批数据没法落在水位之下,致使WINDOW没法触发
  1. 将当前的滑动窗口和对象加入WindowState, 根据不一样的应用场景会使用不一样的WindowState. WindowState 的类型由WindowedStream的具体操做决定, 生成对应的StateDescriptor, 不一样的WindowState 的 add/get 行为会不一样. 好比HeapListWindowState 会把当前的对象追加到currentNamespace (即Timewindow) 对应的List 下. 好比HeapAggregateState 会对当前的对象应用Aggregate function 并更新结果

Window 触发的条件
在 WindowOperator 中有两个点会检查窗口是否触发,二者的检查条件有所不一样优化

  1. processElement 这是在新的流数据进入时触发
    检查条件: watermark时间 >= 窗口最大时间 参见 EventTimeTrigger.onElement
    若是窗口不能被触发则调用InteralTimeService.registerEventTimeTimer 注册一个定时器,以KEY+窗口最大时间为条件触发, 到必定时间后定时器会被自动销毁. 时间为窗口最大时间+WindowOperator.allowedLateness WindowOperator.allowedLateness 能够经过 Stream.window(...).allowedLateness(...) 设置. 通常应该略大于WatermarkGenerator 的 maxOutOfOrderness this

  2. onEventTime 或者 onProcessingTime 取决于Watermark的类型, 这是在Watermark更新的时候触发 (InteralTimeService.advanceWatermark). 这时会把当前Watermark 的时间和以前注册的定时器的时间作比较, 若是定时器还存在而且Watermark的时间大于定时器时间则能够触发窗口. 参见 EventTimeTrigger.onEventTime
    Flink watermark
    参考 http://blog.csdn.net/lmalds/article/details/52704170

WATERMARK和普通数据分开处理
若是一个元素来的过晚 element.getTimestamp + allowedLateness < currentWatermark
会有一个特殊的OutputTag 和正常的流数据区分开
参考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.htmlspa

若是窗口来的过晚, window.maxTimestamp + allowedLateness < currentWatermark, 则窗口会被直接丢弃

Watermark 的问题:
默认的Watermark机制是数据驱动的,新的数据进入才会触发水位上升, 而因为maxOutOfOrderness 的存在, watermark < 最大流数据时间 < 当前窗口结束时间
根据以前的分析,最新的时间窗口老是不会被触发,除非更新的数据进入再次提升水位到当前窗口结束时间之后, 若是数据进入的频率低或者没有新的数据进入流,那最新的时间窗口被处理的延时会很是高甚至永远不会被触发,这在实时性要求高的流式系统是很致命的. 好比一个银行系统,要作客户帐号层面的保序,每一个帐号的交易可能一天只有几笔甚至一笔,若是咱们在Window 处理的时候KEY BY 帐号就会引发上述问题. 咱们能够考虑KEY BY的条件改成 HASH(帐号) 再取模,而后在窗口处理中再次根据帐号分组,这样虽然处理复杂一些,可是保证了窗口中数据的频次

另一种方案是优化WATERMARK生成的机制,若是一段时间后WATERMARK仍然没有变化,那就将WATERMARK自动上涨一次到当前窗口的结束时间,这样保证窗口处理的延时有个上限

public abstract class AbstractWatermarkGenerator<T> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = -2006930231735705083L;
    private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class);
    private final long maxOutOfOrderness; // 10 seconds
    private long windowSize;
    private long currentMaxTimestamp;
    private long lastTimestamp = 0;
    private long lastWatermarkChangeTime = 0;
    private long windowPurgeTime = 0;

    public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) {
        this.maxOutOfOrderness = maxOutOfOrderness;
        this.windowSize = windowSize;
    }

    public AbstractWatermarkGenerator() {
        this(10000, 10000);
    }

    protected abstract long extractCurTimestamp(T element) throws Exception;

    public long extractTimestamp(T element,
            long previousElementTimestamp) {
        try {
            long curTimestamp = extractCurTimestamp(element);
            lastWatermarkChangeTime = new Date().getTime();
            currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp);
            windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp));
            if (logger.isDebugEnabled()) {
                logger.debug("Extracting timestamp: {}", currentMaxTimestamp);
            }
            return curTimestamp;
        } catch (Exception e) {
            logger.error("Error extracting timestamp", e);      
        }

        return 0;
    }

    protected long getWindowExpireTime(long currentMaxTimestamp) {
        long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize);
        long windowEnd = windowStart + windowSize;
        return windowEnd + maxOutOfOrderness;
    }

    public Watermark getCurrentWatermark() {
        long curTime = new Date().getTime();
        if (currentMaxTimestamp > lastTimestamp) {
            if (logger.isDebugEnabled()) {
                logger.debug("Current max timestamp has been increased since last");
            }
            lastTimestamp = currentMaxTimestamp;
            lastWatermarkChangeTime = curTime;
        }
        else {
            long diff = windowPurgeTime - currentMaxTimestamp;
            if (diff > 0 && curTime - lastWatermarkChangeTime > diff) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Increase current MaxTimestamp once");
                }
                currentMaxTimestamp = windowPurgeTime;
                lastTimestamp = currentMaxTimestamp;
                lastWatermarkChangeTime = curTime;
            }
        }

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

实际测试中发现 WATERMARK是否触发和算子的并发度和WATERMARK生成的位置有关
测试结果以下:

  • Env default parallism 10: Source parallism 20, window parallism 6, watermark 生成定义在keyby 以前
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为10, WINDOW SUBTASK 并发度为6, 窗口能够正常触发
  • Env default parallism 20, Source parallism 20, window parallism 6, watermark 生成定义在keyby 以前
    Source 到 WINDOW 算子以前 合成一个SUBTASK,并发度为20, WINDOW SUBTASK 并发度为6, 窗口能够正常触发
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定义在keyby 以前
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为60,WINDOW SUBTASK 并发度为10, 窗口不能正常触发 (我的理解缘由是算子并发度扩大,致使一些CHANNEL处理线程没有数据,根据上文的解释,WATERMARK对齐会取全部CHANNEL最小的WATERMARK,致使水位没法上涨
    能够从FLINK CONSOLE的WATERMARKS看出)
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定义在Source以后
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为60,WINDOW SUBTASK 并发度为10, 窗口能够正常触发
  • Env default parallism 10, Source parallism 20, window parallism 20, watermark 生成定义在keyby 以前
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为10, WINDOW SUBTASK 并发度为20, 窗口能够正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在keyby 以前
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口不能正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在keyby 以前
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口不能正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在Source 以后
    Source 为单独的SUBTASK 并发度为20, 以后到WINDOW算子以前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口能够正常触发

因此注意WINDOW算子以前最好避免让下游算子的并发度超过上游算子,不然就把WATERMARK的生成尽可能放到DAG的前端,这样WATERMARK能够被传递到下游算子

相关文章
相关标签/搜索