聊聊debezium的ElapsedTimeStrategy

本文主要研究一下debezium的ElapsedTimeStrategyjava

ElapsedTimeStrategy

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javagit

@FunctionalInterface
public interface ElapsedTimeStrategy {

    /**
     * Determine if the time period has elapsed since this method was last called.
     *
     * @return {@code true} if this invocation caused the thread to sleep, or {@code false} if this method did not sleep
     */
    boolean hasElapsed();

}
  • ElapsedTimeStrategy定义了hasElapsed方法

none

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javagithub

public static ElapsedTimeStrategy none() {
        return () -> true;
    }
  • none方法针对hasElapsed始终返回true

constant

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javaide

public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {
        if (delayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;

            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        long multiple = 1 + (current - nextTimestamp) / delayInMilliseconds;
                        nextTimestamp += multiple * delayInMilliseconds;
                    } while (current > nextTimestamp);
                    return true;
                }
                return false;
            }
        };
    }
  • constant接收clock及delayInMilliseconds参数,其hasElapsed方法经过delayInMilliseconds计算nextTimestamp,在current小于nextTimestamp时返回false,不然更新nextTimestamp并返回true

step

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javapost

public static ElapsedTimeStrategy step(Clock clock,
                                           long preStepDelayInMilliseconds,
                                           BooleanSupplier stepFunction,
                                           long postStepDelayInMilliseconds) {
        if (preStepDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Pre-step delay must be positive");
        }
        if (postStepDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Post-step delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private boolean elapsed = false;
            private long delta = 0L;

            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    elapsed = stepFunction.getAsBoolean();
                    delta = elapsed ? postStepDelayInMilliseconds : preStepDelayInMilliseconds;
                    nextTimestamp = clock.currentTimeInMillis() + delta;
                    return true;
                }
                if (!elapsed) {
                    elapsed = stepFunction.getAsBoolean();
                    if (elapsed) {
                        delta = postStepDelayInMilliseconds;
                    }
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        assert delta > 0;
                        long multiple = 1 + (current - nextTimestamp) / delta;
                        nextTimestamp += multiple * delta;
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • step接收stepFunction方法,其hasElapsed方法的初始nextTimestamp为clock.currentTimeInMillis() + delta,在elapsed为false时经过stepFunction设置elapsed,若是为true则更新delta为postStepDelayInMilliseconds,以后在current小于nextTimestamp返回false,不然更新nextTimestamp,返回true

linear

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javathis

public static ElapsedTimeStrategy linear(Clock clock, long delayInMilliseconds) {
        if (delayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private long counter = 1L;

            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;
                    counter = 1L;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        if (counter < Long.MAX_VALUE) {
                            ++counter;
                        }
                        nextTimestamp += (delayInMilliseconds * counter);
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • linear经过delayInMilliseconds * counter来递增nextTimestamp

exponential

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javacode

public static ElapsedTimeStrategy exponential(Clock clock,
                                                  long initialDelayInMilliseconds,
                                                  long maxDelayInMilliseconds,
                                                  double multiplier) {
        if (multiplier <= 1.0) {
            throw new IllegalArgumentException("Multiplier must be greater than 1");
        }
        if (initialDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        if (initialDelayInMilliseconds >= maxDelayInMilliseconds) {
            throw new IllegalArgumentException("Maximum delay must be greater than initial delay");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private long previousDelay = 0L;

            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + initialDelayInMilliseconds;
                    previousDelay = initialDelayInMilliseconds;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        // Compute how long to delay ...
                        long nextDelay = (long) (previousDelay * multiplier);
                        if (nextDelay >= maxDelayInMilliseconds) {
                            previousDelay = maxDelayInMilliseconds;
                            // If we're not there yet, then we know the increment is linear from here ...
                            if (nextTimestamp < current) {
                                long multiple = 1 + (current - nextTimestamp) / maxDelayInMilliseconds;
                                nextTimestamp += multiple * maxDelayInMilliseconds;
                            }
                        }
                        else {
                            previousDelay = nextDelay;
                        }
                        nextTimestamp += previousDelay;
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • exponential经过multiple * maxDelayInMilliseconds来递增nextTimestamp

小结

ElapsedTimeStrategy定义了hasElapsed方法,它提供了none、constant、step、linear、exponential这几种实现ip

doc