Netty源码解析 -- FastThreadLocal与HashedWheelTimer

Netty源码分析系列文章已接近尾声,本文再来分析Netty中两个常见组件:FastThreadLoca与HashedWheelTimer。
源码分析基于Netty 4.1.52算法

FastThreadLocal

FastThreadLocal比较简单。
FastThreadLocal和FastThreadLocalThread是配套使用的。
FastThreadLocalThread继承了Thread,FastThreadLocalThread#threadLocalMap 是一个InternalThreadLocalMap,该InternalThreadLocalMap对象只能用于当前线程。
InternalThreadLocalMap#indexedVariables是一个数组,存放了当前线程全部FastThreadLocal对应的值。
而每一个FastThreadLocal都有一个index,用于定位InternalThreadLocalMap#indexedVariables。
数组

FastThreadLocal#get微信

public final V get() {
    // #1
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // #2
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    // #3
    return initialize(threadLocalMap);
}

#1 获取该线程的InternalThreadLocalMap
若是是FastThreadLocalThread,直接获取FastThreadLocalThread#threadLocalMap。
不然,从UnpaddedInternalThreadLocalMap.slowThreadLocalMap获取该线程InternalThreadLocalMap。
注意,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一个ThreadLocal,这里实际回退到使用ThreadLocal了。
#2 每一个FastThreadLocal都有一个index。
经过该index,获取InternalThreadLocalMap#indexedVariables中存放的值
#3 找不到值,经过initialize方法构建新对象。源码分析

能够看到,FastThreadLocal中连hash算法都不用,经过下标获取对应的值,复杂度为log(1),天然很快啦。性能

HashedWheelTimer

HashedWheelTimer是Netty提供的时间轮调度器。
时间轮是一种充分利用线程资源进行批量化任务调度的调度模型,可以高效的管理各类延时任务。
简单说,就是将延时任务存放到一个环形队列中,并经过执行线程定时执行该队列的任务。this

例如,
环形队列上有60个格子,
执行线程每秒移动一个格子,则环形队列每轮可存放1分钟内的任务。
如今有两个定时任务
task1,32秒后执行
task2,2分25秒后执行
而执行线程当前位于第6格子
则task1放到32+6=38格,轮数为0
task2放到25+6=31个,轮数为2
执行线程将执行当前格子轮数为0的任务,并将其余任务轮数减1。
imagespa

缺点,时间轮调度器的时间精度不高。
由于时间轮算法的精度取决于执行线程移动速度。
例如上面例子中执行线程每秒移动一个格子,则调度精度小于一秒的任务就没法准时调用。线程

HashedWheelTimer关键字段code

// 任务执行器,负责执行任务
Worker worker = new Worker();
// 任务执行线程
Thread workerThread;
//  HashedWheelTimer状态, 0 - init, 1 - started, 2 - shut down
int workerState;
// 时间轮队列,使用数组实现
HashedWheelBucket[] wheel;
// 暂存新增的任务
Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 已取消任务
Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

添加延迟任务 HashedWheelTimer#newTimeoutorm

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    ...

    // #1
    start();

    // #2
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    ...
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

#1 若是HashedWheelTimer未启动,则启动该HashedWheelTimer
HashedWheelTimer#start方法负责是启动workerThread线程
#2 startTime是HashedWheelTimer启动时间
deadline是相对HashedWheelTimer启动的延迟时间
构建HashedWheelTimeout,添加到HashedWheelTimer#timeouts

时间轮运行 Worker#run

public void run() {
    ...

    // #1
    startTimeInitialized.countDown();

    do {
        // #2
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // #3
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // #4
            transferTimeoutsToBuckets();
            // #5
            bucket.expireTimeouts(deadline);
            // #6
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // #7
    ...
}

#1 HashedWheelTimer#start方法阻塞HashedWheelTimer线程直到Worker启动完成,这里解除HashedWheelTimer线程阻塞。
#2 计算下一格子开始执行的时间,而后sleep到下次格子开始执行时间
#2 tick是从HashedWheelTimer启动后移动的总格子数,这里获取tick对应的格子索引。
因为Long类型足够大,这里并不考虑溢出问题。
#4 将HashedWheelTimer#timeouts的任务迁移到对应的格子中
#5 处理已到期任务
#6 移动到下一个格子
#7 这里是HashedWheelTimer#stop后的逻辑处理,取消任务,中止时间轮

迁移任务 Worker#transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // #1
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            continue;
        }
        // #2
        long calculated = timeout.deadline / tickDuration;
        // #3
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // #4
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        // #5
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

#1 注意,每次只迁移100000个任务,以避免阻塞线程
#2 任务延迟时间/每格时间数, 获得该任务需延迟的总格子移动数
#3 (总格子移动数 - 已移动格子数)/每轮格子数,获得轮数
#4 若是任务在timeouts队列放得过久致使已通过了执行时间,则使用当前tick, 也就是放到当前bucket,以便尽快执行该任务
#5 计算tick对应格子索引,放到对应的格子位置

执行到期任务 HashedWheelBucket#expireTimeouts

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // #1
        if (timeout.remainingRounds <= 0) {
            // #2
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // #3
                timeout.expire();
            } else {
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // #4
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

#1 选择轮数小于等于0的任务
#2 移除任务
#3 修改状态为过时,并执行任务
#4 其余任务轮数减1

ScheduledExecutorService使用堆(DelayedWorkQueue)维护任务,新增任务复杂度为O(logN)。
而 HashedWheelTimer 新增任务复杂度为O(1),因此在任务很是多时, HashedWheelTimer 能够表现出它的优点。
可是任务较少甚至没有任务时,HashedWheelTimer的执行线程都须要不断移动,也会形成性能消耗。
注意,HashedWheelTimer使用同一个线程调用和执行任务,若是某些任务执行时间太久,则影响后续定时任务执行。固然,咱们也能够考虑在任务中另起线程执行逻辑。
另外,若是任务过多,也会致使任务长期滞留在HashedWheelTimer#timeouts中而不能及时执行。

若是您以为本文不错,欢迎关注个人微信公众号,系列文章持续更新中。您的关注是我坚持的动力!