聊聊flink的RestartStrategies

本文主要研究一下flink的RestartStrategieshtml

RestartStrategies

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javajava

@PublicEvolving
public class RestartStrategies {

	/**
	 * Generates NoRestartStrategyConfiguration.
	 *
	 * @return NoRestartStrategyConfiguration
	 */
	public static RestartStrategyConfiguration noRestart() {
		return new NoRestartStrategyConfiguration();
	}

	public static RestartStrategyConfiguration fallBackRestart() {
		return new FallbackRestartStrategyConfiguration();
	}

	/**
	 * Generates a FixedDelayRestartStrategyConfiguration.
	 *
	 * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
	 * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
	 * @return FixedDelayRestartStrategy
	 */
	public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
		return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
	}

	/**
	 * Generates a FixedDelayRestartStrategyConfiguration.
	 *
	 * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
	 * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
	 * @return FixedDelayRestartStrategy
	 */
	public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
		return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
	}

	/**
	 * Generates a FailureRateRestartStrategyConfiguration.
	 *
	 * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
	 * @param failureInterval Time interval for failures
	 * @param delayInterval Delay in-between restart attempts
	 */
	public static FailureRateRestartStrategyConfiguration failureRateRestart(
			int failureRate, Time failureInterval, Time delayInterval) {
		return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
	}

	//......
}
  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration

RestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javaweb

public abstract static class RestartStrategyConfiguration implements Serializable {
		private static final long serialVersionUID = 6285853591578313960L;

		private RestartStrategyConfiguration() {}

		/**
		 * Returns a description which is shown in the web interface.
		 *
		 * @return Description of the restart strategy
		 */
		public abstract String getDescription();
	}
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类

NoRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javaapache

public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {
		private static final long serialVersionUID = -5894362702943349962L;

		@Override
		public String getDescription() {
			return "Restart deactivated.";
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			return o instanceof NoRestartStrategyConfiguration;
		}

		@Override
		public int hashCode() {
			return Objects.hash();
		}
	}
  • NoRestartStrategyConfiguration继承了RestartStrategyConfiguration,它表明no restart strategy

FixedDelayRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javaapi

public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {
		private static final long serialVersionUID = 4149870149673363190L;

		private final int restartAttempts;
		private final Time delayBetweenAttemptsInterval;

		FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
			this.restartAttempts = restartAttempts;
			this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
		}

		public int getRestartAttempts() {
			return restartAttempts;
		}

		public Time getDelayBetweenAttemptsInterval() {
			return delayBetweenAttemptsInterval;
		}

		@Override
		public int hashCode() {
			int result = restartAttempts;
			result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (obj instanceof FixedDelayRestartStrategyConfiguration) {
				FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;

				return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
			} else {
				return false;
			}
		}

		@Override
		public String getDescription() {
			return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #"
				+ restartAttempts + " restart attempts.";
		}
	}
  • FixedDelayRestartStrategyConfiguration继承了RestartStrategyConfiguration,它表明fixed delay restart strategy,它有restartAttempts及delayBetweenAttemptsInterval两个属性

FailureRateRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javaapp

public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
		private static final long serialVersionUID = 1195028697539661739L;
		private final int maxFailureRate;

		private final Time failureInterval;
		private final Time delayBetweenAttemptsInterval;

		public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
			this.maxFailureRate = maxFailureRate;
			this.failureInterval = failureInterval;
			this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
		}

		public int getMaxFailureRate() {
			return maxFailureRate;
		}

		public Time getFailureInterval() {
			return failureInterval;
		}

		public Time getDelayBetweenAttemptsInterval() {
			return delayBetweenAttemptsInterval;
		}

		@Override
		public String getDescription() {
			return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
					+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			if (o == null || getClass() != o.getClass()) {
				return false;
			}
			FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
			return maxFailureRate == that.maxFailureRate &&
				Objects.equals(failureInterval, that.failureInterval) &&
				Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
		}

		@Override
		public int hashCode() {
			return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
		}
	}
  • FailureRateRestartStrategyConfiguration继承了RestartStrategyConfiguration,它表明failure rate restart strategy,它有maxFailureRate、failureInterval、delayBetweenAttemptsInterval三个属性

FallbackRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.javaless

public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
		private static final long serialVersionUID = -4441787204284085544L;

		@Override
		public String getDescription() {
			return "Cluster level default restart strategy";
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			return o instanceof FallbackRestartStrategyConfiguration;
		}

		@Override
		public int hashCode() {
			return Objects.hash();
		}
	}
  • FallbackRestartStrategyConfiguration继承了RestartStrategyConfiguration,它表明Cluster level default restart strategy

RestartStrategyResolving

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.javaide

public final class RestartStrategyResolving {

	/**
	 * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
	 * The resolving strategy is as follows:
	 * <ol>
	 * <li>Strategy set within job graph.</li>
	 * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
	 * is enabled.</li>
	 * <li>If no strategy was set on client and server side and checkpointing was enabled then
	 * {@link FixedDelayRestartStrategy} is used</li>
	 * </ol>
	 *
	 * @param clientConfiguration restart configuration given within the job graph
	 * @param serverStrategyFactory default server side strategy factory
	 * @param isCheckpointingEnabled if checkpointing was enabled for the job
	 * @return resolved strategy
	 */
	public static RestartStrategy resolve(
			RestartStrategies.RestartStrategyConfiguration clientConfiguration,
			RestartStrategyFactory serverStrategyFactory,
			boolean isCheckpointingEnabled) {

		final RestartStrategy clientSideRestartStrategy =
			RestartStrategyFactory.createRestartStrategy(clientConfiguration);

		if (clientSideRestartStrategy != null) {
			return clientSideRestartStrategy;
		} else {
			if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
				return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
					.createRestartStrategy(isCheckpointingEnabled);
			} else {
				return serverStrategyFactory.createRestartStrategy();
			}
		}
	}

	private RestartStrategyResolving() {
	}
}
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,而后使用RestartStrategyFactory建立RestartStrategy

RestartStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.javathis

public interface RestartStrategy {

	/**
	 * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
	 *
	 * @return true if restart is possible, otherwise false
	 */
	boolean canRestart();

	/**
	 * Called by the ExecutionGraph to eventually trigger a full recovery.
	 * The recovery must be triggered on the given callback object, and may be delayed
	 * with the help of the given scheduled executor.
	 *
	 * <p>The thread that calls this method is not supposed to block/sleep.
	 *
	 * @param restarter The hook to restart the ExecutionGraph
	 * @param executor An scheduled executor to delay the restart
	 */
	void restart(RestartCallback restarter, ScheduledExecutor executor);
}
  • RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

NoRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.javarest

public class NoRestartStrategy implements RestartStrategy {

	@Override
	public boolean canRestart() {
		return false;
	}

	@Override
	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
		throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
	}

	/**
	 * Creates a NoRestartStrategyFactory instance.
	 *
	 * @param configuration Configuration object which is ignored
	 * @return NoRestartStrategyFactory instance
	 */
	public static NoRestartStrategyFactory createFactory(Configuration configuration) {
		return new NoRestartStrategyFactory();
	}

	@Override
	public String toString() {
		return "NoRestartStrategy";
	}

	public static class NoRestartStrategyFactory extends RestartStrategyFactory {

		private static final long serialVersionUID = -1809462525812787862L;

		@Override
		public RestartStrategy createRestartStrategy() {
			return new NoRestartStrategy();
		}
	}
}
  • NoRestartStrategy实现了RestartStrategy接口,它的canRestart方法返回false,restart方法抛出UnsupportedOperationException

FixedDelayRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java

public class FixedDelayRestartStrategy implements RestartStrategy {

	private final int maxNumberRestartAttempts;
	private final long delayBetweenRestartAttempts;
	private int currentRestartAttempt;

	public FixedDelayRestartStrategy(
		int maxNumberRestartAttempts,
		long delayBetweenRestartAttempts) {

		Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive.");
		Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");

		this.maxNumberRestartAttempts = maxNumberRestartAttempts;
		this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
		currentRestartAttempt = 0;
	}

	public int getCurrentRestartAttempt() {
		return currentRestartAttempt;
	}

	@Override
	public boolean canRestart() {
		return currentRestartAttempt < maxNumberRestartAttempts;
	}

	@Override
	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
		currentRestartAttempt++;

		executor.schedule(new Runnable() {
			@Override
			public void run() {
				restarter.triggerFullRecovery();
			}
		}, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
	}

	/**
	 * Creates a FixedDelayRestartStrategy from the given Configuration.
	 *
	 * @param configuration Configuration containing the parameter values for the restart strategy
	 * @return Initialized instance of FixedDelayRestartStrategy
	 * @throws Exception
	 */
	public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
		int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);

		String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);

		long delay;

		try {
			delay = Duration.apply(delayString).toMillis();
		} catch (NumberFormatException nfe) {
			throw new Exception("Invalid config value for " +
					ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
					". Value must be a valid duration (such as '100 milli' or '10 s')");
		}

		return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
	}

	@Override
	public String toString() {
		return "FixedDelayRestartStrategy(" +
				"maxNumberRestartAttempts=" + maxNumberRestartAttempts +
				", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
				')';
	}

	public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {

		private static final long serialVersionUID = 6642934067762271950L;

		private final int maxAttempts;
		private final long delay;

		public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
			this.maxAttempts = maxAttempts;
			this.delay = delay;
		}

		@Override
		public RestartStrategy createRestartStrategy() {
			return new FixedDelayRestartStrategy(maxAttempts, delay);
		}
	}
}
  • FixedDelayRestartStrategy实现了RestartStrategy接口,它的canRestart方法依据currentRestartAttempt及maxNumberRestartAttempts来判断;restart方法则直接调用ScheduledExecutor.schedule方法,延时delayBetweenRestartAttempts毫秒执行RestartCallback.triggerFullRecovery()

FailureRateRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java

public class FailureRateRestartStrategy implements RestartStrategy {

	private final Time failuresInterval;
	private final Time delayInterval;
	private final int maxFailuresPerInterval;
	private final ArrayDeque<Long> restartTimestampsDeque;

	public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
		Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
		Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
		Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms.");
		Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms.");

		this.failuresInterval = failuresInterval;
		this.delayInterval = delayInterval;
		this.maxFailuresPerInterval = maxFailuresPerInterval;
		this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);
	}

	@Override
	public boolean canRestart() {
		if (isRestartTimestampsQueueFull()) {
			Long now = System.currentTimeMillis();
			Long earliestFailure = restartTimestampsDeque.peek();

			return (now - earliestFailure) > failuresInterval.toMilliseconds();
		} else {
			return true;
		}
	}

	@Override
	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
		if (isRestartTimestampsQueueFull()) {
			restartTimestampsDeque.remove();
		}
		restartTimestampsDeque.add(System.currentTimeMillis());

		executor.schedule(new Runnable() {
			@Override
			public void run() {
				restarter.triggerFullRecovery();
			}
		}, delayInterval.getSize(), delayInterval.getUnit());
	}

	private boolean isRestartTimestampsQueueFull() {
		return restartTimestampsDeque.size() >= maxFailuresPerInterval;
	}

	@Override
	public String toString() {
		return "FailureRateRestartStrategy(" +
			"failuresInterval=" + failuresInterval +
			"delayInterval=" + delayInterval +
			"maxFailuresPerInterval=" + maxFailuresPerInterval +
			")";
	}

	public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
		int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
		String failuresIntervalString = configuration.getString(
				ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
		);
		String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
		String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);

		Duration failuresInterval = Duration.apply(failuresIntervalString);
		Duration delay = Duration.apply(delayString);


		return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
	}

	public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {
		private static final long serialVersionUID = -373724639430960480L;

		private final int maxFailuresPerInterval;
		private final Time failuresInterval;
		private final Time delayInterval;

		public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
			this.maxFailuresPerInterval = maxFailuresPerInterval;
			this.failuresInterval = Preconditions.checkNotNull(failuresInterval);
			this.delayInterval = Preconditions.checkNotNull(delayInterval);
		}

		@Override
		public RestartStrategy createRestartStrategy() {
			return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);
		}
	}
}
  • FailureRateRestartStrategy实现了RestartStrategy接口,它的canRestart方法在restartTimestampsDeque队列大小小于maxFailuresPerInterval时返回true,大于等于maxFailuresPerInterval时则判断当前时间距离earliestFailure是否大于failuresInterval;restart方法则往restartTimestampsDeque添加当前时间,而后调用ScheduledExecutor.schedule方法,延时delayInterval执行RestartCallback.triggerFullRecovery()

小结

  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,而后使用RestartStrategyFactory建立RestartStrategy;RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

doc

相关文章
相关标签/搜索