深刻JDK源码之定时操做Timer类和TimerTask类实现

Timer类是一种线程设施,能够用来实现某一个时间或某一段时间后安排某一个任务执行一次或按期重复执行。该功能和TimerTask配合使用。TimerTask类用于实现由Timer安排的一次或重复执行的某个任务。每个Timer对象对应的是一个线程,所以计时器所执行的任务应该迅速完成,不然会延迟后续的任务执行。 ##JDK源码TimerTask类## 这个类是个抽象类比较简单,有四个常量表示定时器任务的状态,还有一个Object类型lock对象,至关一把锁,控制线程对定时器任务状态的同步访问。 nextExecutionTime:这个成员变量用到记录该任务下次执行时间, 其格式和System.currentTimeMillis()一致。 这个值是做为任务队列中任务排序的依据. 任务调试者执行每一个任务前会对这个值做处理,从新计算下一次任务执行时间,并为这个变量赋值.。 period:用来描述任务的执行方式: 0表示不重复执行的任务. 正数表示固定速率执行的任务. 负数表示固定延迟执行的任务。(固定速率: 不考虑该任务上一次执行状况,始终从开始时间算起的每period执行下一次. 固定延迟: 考虑该任务一次执行状况,在上一次执行后period执行下一次)。java

public abstract class TimerTask implements Runnable {
    //这个对象是用来控制访问TimerTask内部构件。锁机制
    final Object lock = new Object();
    //定时器任务的状态
    int state = VIRGIN;
    //定时器任务默认的状态,表示尚未被安排
    static final int VIRGIN = 0;
    //表示定时器任务被安排了
    static final int SCHEDULED   = 1;
    //表示定时器任务执行
    static final int EXECUTED    = 2;
    //表示定时器任务取消
    static final int CANCELLED   = 3;
    //下次执行任务时间
    long nextExecutionTime;
    long period = 0;

    protected TimerTask() {
    }
    // 此计时器任务要执行的操做。
    public abstract void run();

    // 取消此计时器任务。
    public boolean cancel() {
        synchronized(lock) {
            boolean result = (state == SCHEDULED);
            state = CANCELLED;
            return result;
        }
    }
    // 返回此任务最近实际 执行的已安排 执行时间。
    public long scheduledExecutionTime() {
        synchronized(lock) {
            return (period < 0 ? nextExecutionTime + period
                            : nextExecutionTime - period);
        }
    }
}

##深刻JDK源码之Timer类## Timer中最主要由三个部分组成: 任务 TimerTask 、 任务队列: TaskQueue queue 和 任务调试者:TimerThread thread。 ###任务队列TaskQueue,它是Timer的一个内部类### 事实上任务队列是一个数组, 采用平衡二叉堆来实现他的优先级调度, 而且是一个小顶堆. 须要注意的是, 这个堆中queue[n] 的孩子是queue[2n] 和 queue[2n+1]。api

任务队列的优先级按照TimerTask类的成员变量nextExecutionTime值来排序(注意, 这里的任务指的是那些交由定时器来执行的, 继承TimerTask的对象).数组

在任务队列中, nextExecutionTime最小就是全部任务中最先要被调度来执行的, 因此被安排在queue[1] (假设任务队列非空).oop

对于堆中任意一个节点n, 和他的任意子孙节点d,必定遵循: n.nextExecutionTime <= d.nextExecutionTime.测试

// 任务队列
class TaskQueue {
    // 计时器任务数组,默认大小为128
    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

    int size() {
        return size;
    }

    // 加入队列
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            // 队列以两倍的速度扩容
            queue = Arrays.copyOf(queue, 2 * queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    // 获取队列的元素,即第一个任务,第一个元素存储的是
    TimerTask getMin() {
        return queue[1];
    }

    TimerTask get(int i) {
        return queue[i];
    }

    // 消除头任务从优先队列。
    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null; // Drop extra reference to prevent memory leak
        fixDown(1);
    }

    /**
     * Removes the ith element from queue without regard for maintaining the
     * heap invariant. Recall that queue is one-based, so 1 <= i <= size.
     */
    void quickRemove(int i) {
        // 断言,在这里只起测试做用
        assert i <= size;

        queue[i] = queue[size];
        queue[size--] = null; // Drop extra ref to prevent memory leak
    }

    /**
     * Sets the nextExecutionTime associated with the head task to the specified
     * value, and adjusts priority queue accordingly.
     */
    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }

    /**
     * Returns true if the priority queue contains no elements.
     */
    boolean isEmpty() {
        return size == 0;
    }

    /**
     * Removes all elements from the priority queue.
     */
    void clear() {
        // Null out task references to prevent memory leak
        for (int i = 1; i <= size; i++)
            queue[i] = null;
        size = 0;
    }

    // 进行队列中任务优先级调整. fixUp方法的做用是尽可能将队列中指定位置(k)的任务向队列前面移动,
    // 即提升它的优先级. 由于新加入的方法颇有可能比已经在任务队列中的其它任务要更早执行.
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;// 左移一位,至关于除以2
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
            queue[j] = queue[k];
            queue[k] = tmp;
            k = j;
        }
    }

    // 从任务队列中移除一个任务的过程, 首先直接将当前任务队列中最后一个任务赋给queue[1],
    // 而后将队列中任务数量--, 最后和上面相似, 可是这里是调用fixDown(int k)方法了, 尽可能将k位置的任务向队列后面移动.
    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size && queue[j].nextExecutionTime > queue[j + 1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
            queue[j] = queue[k];
            queue[k] = tmp;
            k = j;
        }
    }

    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     */
    void heapify() {
        for (int i = size / 2; i >= 1; i--)
            fixDown(i);
    }
}

###任务调度 TimerThread###ui

// 计时器线程
class TimerThread extends Thread {

	// 新任务是否被安排
	boolean newTasksMayBeScheduled = true;

	// 任务队列
	private TaskQueue queue;

	TimerThread(TaskQueue queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			mainLoop();
		} finally {
			// Someone killed this Thread, behave as if Timer cancelled
			synchronized (queue) {
				newTasksMayBeScheduled = false;
				queue.clear(); // Eliminate obsolete references
			}
		}
	}

	private void mainLoop() {
		while (true) {
			try {
				TimerTask task;
				boolean taskFired;
				synchronized (queue) {
					// Wait for queue to become non-empty
					while (queue.isEmpty() && newTasksMayBeScheduled)
						queue.wait();
					if (queue.isEmpty())
						break; // Queue is empty and will forever remain; die

					// Queue nonempty; look at first evt and do the right thing
					long currentTime, executionTime;
					task = queue.getMin();
					synchronized (task.lock) {
						if (task.state == TimerTask.CANCELLED) {
							queue.removeMin();
							continue; // No action required, poll queue again
						}
						currentTime = System.currentTimeMillis();
						executionTime = task.nextExecutionTime;
						if (taskFired = (executionTime <= currentTime)) {
							if (task.period == 0) { // Non-repeating, remove
								queue.removeMin();
								task.state = TimerTask.EXECUTED;
							} else { // Repeating task, reschedule
								queue
										.rescheduleMin(task.period < 0 ? currentTime
												- task.period
												: executionTime + task.period);
							}
						}
					}
					if (!taskFired) // Task hasn't yet fired; wait
						queue.wait(executionTime - currentTime);
				}
				if (taskFired) // Task fired; run it, holding no locks
					task.run();
			} catch (InterruptedException e) {
			}
		}
	}
}

###Timer类的主体和主要对外提供的方法###this

import java.util.*;
import java.util.Date;

public class Timer {
	// 定时任务队列
	private TaskQueue queue = new TaskQueue();

	// 计时器线程
	private TimerThread thread = new TimerThread(queue);

	private Object threadReaper = new Object() {
		protected void finalize() throws Throwable {
			synchronized (queue) {
				thread.newTasksMayBeScheduled = false;
				queue.notify(); // In case queue is empty.
			}
		}
	};

	// ID号做为线程的ID
	private static int nextSerialNumber = 0;

	private static synchronized int serialNumber() {
		return nextSerialNumber++;
	}

	public Timer() {
		this("Timer-" + serialNumber());
	}

	// 建立一个新计时器,能够指定其相关的线程做为守护程序运行。
	public Timer(boolean isDaemon) {
		this("Timer-" + serialNumber(), isDaemon);
	}

	public Timer(String name) {
		thread.setName(name);
		thread.start();
	}

	// 建立一个新计时器,其相关的线程具备指定的名称,而且能够指定做为守护程序运行。
	public Timer(String name, boolean isDaemon) {
		thread.setName(name);
		thread.setDaemon(isDaemon);
		thread.start();
	}

	// 安排在指定延迟后执行指定的任务。时间单位毫秒
	public void schedule(TimerTask task, long delay) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		sched(task, System.currentTimeMillis() + delay, 0);
	}

	// 安排在指定的时间执行指定的任务。
	public void schedule(TimerTask task, Date time) {
		sched(task, time.getTime(), 0);
	}

	// 安排指定的任务从指定的延迟后开始进行重复的固定延迟执行。
	public void schedule(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, -period);
	}

	// 安排指定的任务在指定的时间开始进行重复的固定延迟执行。
	public void schedule(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), -period);
	}

	// 安排指定的任务在指定的延迟后开始进行重复的固定速率执行。
	public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, period);
	}

	// 安排指定的任务在指定的时间开始进行重复的固定速率执行。
	public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), period);
	}
	private void sched(TimerTask task, long time, long period) {
		if (time < 0)
			throw new IllegalArgumentException("Illegal execution time.");
		// 同步代码块 ,对queue的访问须要同步
		synchronized (queue) {
			if (!thread.newTasksMayBeScheduled)
				throw new IllegalStateException("Timer already cancelled.");
			// 同步代码块,须要得到task的lock,锁
			synchronized (task.lock) {
				if (task.state != TimerTask.VIRGIN)
					throw new IllegalStateException(
							"Task already scheduled or cancelled");
				// 任务接下来执行的时刻
				task.nextExecutionTime = time;
				// 任务执行时间间隔周期
				task.period = period;
				// 任务已经安排,等待执行
				task.state = TimerTask.SCHEDULED;
			}
			// 加入计时器等待任务队列
			queue.add(task);
			//
			if (queue.getMin() == task)
				// 唤醒在此对象监视器上等待的单个线程。
				queue.notify();
		}
	}

	// 终止此计时器,丢弃全部当前已安排的任务。
	public void cancel() {
		synchronized (queue) {
			thread.newTasksMayBeScheduled = false;
			queue.clear();
			queue.notify(); // In case queue was already empty.
		}
	}

	// 今后计时器的任务队列中移除全部已取消的任务。
	public int purge() {
		int result = 0;

		synchronized (queue) {
			for (int i = queue.size(); i > 0; i--) {
				if (queue.get(i).state == TimerTask.CANCELLED) {
					queue.quickRemove(i);
					result++;
				}
			}

			if (result != 0)
				queue.heapify();
		}

		return result;
	}
}
相关文章
相关标签/搜索