聊聊flink的Allowed Lateness

本文主要研究一下flink的Allowed Latenesshtml

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.javajava

@Public
public class WindowedStream<T, K, W extends Window> {

	/** The keyed data stream that is windowed by this stream. */
	private final KeyedStream<T, K> input;

	/** The window assigner. */
	private final WindowAssigner<? super T, W> windowAssigner;

	/** The trigger that is used for window evaluation/emission. */
	private Trigger<? super T, ? super W> trigger;

	/** The evictor that is used for evicting elements before window evaluation. */
	private Evictor<? super T, ? super W> evictor;

	/** The user-specified allowed lateness. */
	private long allowedLateness = 0L;

	/**
	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
	 * dropped.
 	 */
	private OutputTag<T> lateDataOutputTag;

	@PublicEvolving
	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
		final long millis = lateness.toMilliseconds();
		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

		this.allowedLateness = millis;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
		return this;
	}

	//......

	public <R> SingleOutputStreamOperator<R> reduce(
			ReduceFunction<T> reduceFunction,
			WindowFunction<T, R, K, W> function,
			TypeInformation<R> resultType) {

		if (reduceFunction instanceof RichFunction) {
			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
		}

		//clean the closures
		function = input.getExecutionEnvironment().clean(function);
		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

		final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
		KeySelector<T, K> keySel = input.getKeySelector();

		OneInputStreamOperator<T, R> operator;

		if (evictor != null) {
			@SuppressWarnings({"unchecked", "rawtypes"})
			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
				(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			ListStateDescriptor<StreamRecord<T>> stateDesc =
				new ListStateDescriptor<>("window-contents", streamRecordSerializer);

			operator =
				new EvictingWindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
					trigger,
					evictor,
					allowedLateness,
					lateDataOutputTag);

		} else {
			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
				reduceFunction,
				input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			operator =
				new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalSingleValueWindowFunction<>(function),
					trigger,
					allowedLateness,
					lateDataOutputTag);
		}

		return input.transform(opName, resultType, operator);
	}

	//......
}
  • WindowedStream有两个参数跟Allowed Lateness相关,一个是allowedLateness,用于指定容许元素迟到的时间长度,一个是lateDataOutputTag,用于配置迟到元素的输出
  • WindowedStream的reduce、aggregate、fold、process等操做里头会根据evictor是否为null来建立不一样的WindowOperator(evictor不为null建立的是EvictingWindowOperator,evictor为null建立的是WindowOperator)
  • EvictingWindowOperator继承了WindowOperator,其构造器比WindowOperator多了Evictor参数,但它们构造器都须要Trigger、allowedLateness、lateDataOutputTag参数

OutputTag

flink-core-1.7.0-sources.jar!/org/apache/flink/util/OutputTag.javaapache

@PublicEvolving
public class OutputTag<T> implements Serializable {

	private static final long serialVersionUID = 2L;

	private final String id;

	private final TypeInformation<T> typeInfo;

	public OutputTag(String id) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
		Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
		this.id = id;

		try {
			this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
		}
		catch (InvalidTypesException e) {
			throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
					"The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
					"It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
		}
	}

	public OutputTag(String id, TypeInformation<T> typeInfo) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
		Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
		this.id = id;
		this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
	}

	// ------------------------------------------------------------------------

	public String getId() {
		return id;
	}

	public TypeInformation<T> getTypeInfo() {
		return typeInfo;
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean equals(Object obj) {
		return obj instanceof OutputTag
			&& ((OutputTag) obj).id.equals(this.id);
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}

	@Override
	public String toString() {
		return "OutputTag(" + getTypeInfo() + ", " + id + ")";
	}
}
  • OutputTag是一个带有名称及类型信息的side output标识;flink容许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.javawindows

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {

	protected boolean nonParallel = false;

	private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();

	private boolean wasSplitApplied = false;

	//......

	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
		if (wasSplitApplied) {
			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
				"As a work-around, please add a no-op map function before the split() call.");
		}

		sideOutputTag = clean(requireNonNull(sideOutputTag));

		// make a defensive copy
		sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

		TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
		if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
			throw new UnsupportedOperationException("A side output with a matching id was " +
					"already requested with a different type. This is not allowed, side output " +
					"ids need to be unique.");
		}

		requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

		SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
		return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
	}
}
  • SingleOutputStreamOperator提供了getSideOutput方法,能够根据OutputTag来获取以前在function里头输出的site output

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.javaapi

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

	//......

	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

	protected boolean isElementLate(StreamRecord<IN> element){
		return (windowAssigner.isEventTime()) &&
			(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
	}

	private long cleanupTime(W window) {
		if (windowAssigner.isEventTime()) {
			long cleanupTime = window.maxTimestamp() + allowedLateness;
			return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
		} else {
			return window.maxTimestamp();
		}
	}

    //......
}
  • WindowOperator里头有个isElementLate方法,根据allowedLateness来判断一个element是否late,其processElement方法最后在isSkippedElement为true并且isElementLate也为true的状况下会执行以下逻辑:在lateDataOutputTag不为null的状况下会将late的element输出到side output,若是lateDataOutputTag为null,则执行numLateRecordsDropped.inc()来递增numLateRecordsDropped统计数

小结

  • 当使用event-time window的时候,flink提供了allowedLateness方法用来配置元素容许的迟到时间,超过该值则会被丢弃(在窗口结束时间+容许迟到时间内到达的元素仍然会被添加到窗口内),默认该参数设置为0;对于使用GlobalWindows这类window assigner,因为其end时间戳为Long.MAX_VALUE,所以element就无所谓late
  • OutputTag是一个带有名称及类型信息的side output标识;flink容许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output
  • SingleOutputStreamOperator提供了getSideOutput方法,能够根据OutputTag来获取以前在function里头输出的site output;WindowOperator的processElement方法在最后会判断,若是isSkippedElement为true并且isElementLate也为true,则在lateDataOutputTag不为null的状况下会将late的element输出到side output

doc

相关文章
相关标签/搜索