storm定时器与java.util.Timer定时器比较类似。java.util.Timer定时器其实是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid,很好理解,时间表示该定时任务何时执行,函数表示要执行的函数,uuid用于标识该"定时任务"。"定时任务"被存放到定时器的PriorityQueue队列中(和PriorityBlockingQueue区别,在于没有阻塞机制,不是线程安全的)。优先级队列是堆数据结构的典型应用,若是不提供Comparator的话,优先队列中元素默认按天然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅 Comparable),也能够根据 Comparator 来指定,这取决于使用哪一种构造方法。优先级队列不容许null元素。依靠天然排序的优先级队列还不容许插入不可比较的对象(这样作可能致使 ClassCastException)。固然也能够本身从新实现Comparator接口, 好比storm定时器就用reify从新实现了Comparator接口。storm定时器的执行过程比较简单,经过timer-thread,不断检查PriorityQueue里面时间最小的"定时任务"是否已经能够触发了, 若是能够(当前时间>=执行时间),就poll出来,调用callback,并sleep。storm定时器相关的函数均定义在timer.clj文件中,storm定时器是由mk-timer函数建立的,mk-timer函数定义以下:
mk-timer函数java
;; kill-fn函数会在timer-thread发生exception的时候被调用,timer-name标识定时器的名称 (defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil] ;; queue绑定PriorityQueue队列,建立PriorityQueue队列时指定队列初始容量为10,并指定一个Comparator比 ;;较器,Comparator比较器比较"定时任务"执行时间的大小,这样每次poll出执行时间最小的"定时任务", ;; PriorityQueue队列是一个依赖执行时间的小顶堆 (let [queue (PriorityQueue. 10 (reify Comparator (compare [this o1 o2] (- (first o1) (first o2))) (equals [this obj] true))) ;; active标识timer-thread是"active"的 active (atom true) ;; 建立一个锁,由于PriorityQueue并非线程安全的,因此经过这个锁,可使多线程互斥访问PriorityQueue lock (Object.) ;; notifier是一个java信号量,初始值为0,notifier信号量的主要功能就是当咱们调用cancel-timer函数中断 ;; 一个timer-thread时,等待timer-thread结束,当timer-thread结束前会release notifier信号量 notifier (Semaphore. 0) ;; thread-name绑定timer-thread线程名,没有指定时默认为"timer" thread-name (if timer-name timer-name "timer") ;; timer-thread线程 timer-thread (Thread. (fn [] ;; 当timer-thread为"active"即active=true时,进入while循环 (while @active (try ;; peek函数从PriorityQueue获取执行时间最小的"定时任务",但并不出队列。time-mil ;; lis绑定执行时间,elem绑定"定时任务"数据 (let [[time-millis _ _ :as elem] (locking lock (.peek queue))] ;; 若是elem不为nil且当前时间>=执行时间,那么先加锁,而后poll出该"定时任务", ;; 并将"定时任务"的callback函数绑定到afn,最后调用该函数;不然判断time-millis ;; 是否为nil。 ;; 咱们能够发现该定时器是软时间执行"定时任务"的,也就是说"定时任务"有可能被延 ;; 迟执行,同时若是afn函数执行时间比较长,那么会影响下一个"定时任务"的执行 (if (and elem (>= (current-time-millis) time-millis)) ;; It is imperative to not run the function ;; inside the timer lock. Otherwise, it is ;; possible to deadlock if the fn deals with ;; other locks, like the submit lock. (let [afn (locking lock (second (.poll queue)))] ;; 执行"定时任务"的callback函数 (afn)) ;; 该if语句是上面if语句的else分支,判断time-millis是否为nil,若是time-mill ;; is不为nil,则timer-thread线程sleep(执行时间-当前时间);不然sleep(1000) ;; 代表PriorityQueue中没有"定时任务" (if time-millis ;; If any events are scheduled, sleep until ;; event generation. If any recurring events ;; are scheduled then we will always go ;; through this branch, sleeping only the ;; exact necessary amount of time. (Time/sleep (- time-millis (current-time-millis))) ;; Otherwise poll to see if any new event ;; was scheduled. This is, in essence, the ;; response time for detecting any new event ;; schedulings when there are no scheduled ;; events. (Time/sleep 1000)))) (catch Throwable t ;; Because the interrupted exception can be ;; wrapped in a RuntimeException. ;; 检查是不是InterruptedException,若是是InterruptedException,说明线程是由 ;; 于接收interrupt信号而中断的,不作异常处理,不然调用kill-fn函数、修改线程 ;; 状态并抛出该异常 (when-not (exception-cause? InterruptedException t) (kill-fn t) (reset! active false) (throw t))))) ;; release notifier信号量,标识timer—thread运行结束 (.release notifier)) thread-name)] ;; 设置timer-thread为守护线程 (.setDaemon timer-thread true) ;; 设置timer-thread为最高优先级 (.setPriority timer-thread Thread/MAX_PRIORITY) ;; 启动timer-thread线程 (.start timer-thread) ;; 返回该定时器的"属性" {:timer-thread timer-thread :queue queue :active active :lock lock :cancel-notifier notifier}))
咱们能够经过调用cancel-timer函数中断一个timer-thread线程,cancel-timer函数定义以下:
cancel-timer函数web
(defn cancel-timer [timer] ;; 检查timer状态是不是"active",若是不是则抛出异常 (check-active! timer) ;; 加锁 (locking (:lock timer) ;; 将timer的状态active设置成false,即"dead" (reset! (:active timer) false) ;; 调用interrupt方法,中断线程,经过mk-timer函数咱们能够知道在线程的run方法内调用了sleep方法, ;; 当接收到中断新号后会抛出InterruptedException异常使线程退出 (.interrupt (:timer-thread timer))) ;; acquire timer中的notifier信号量,由于只有当线程结束前才会release notifier信号量,因此此处是等待线程;;; 结束 (.acquire (:cancel-notifier timer)))
check-active!函数定义以下:
check-active!函数shell
(defn- check-active! [timer] (when-not @(:active timer) (throw (IllegalStateException. "Timer is not active"))))
经过调用schedule函数和schedule-recurring函数咱们能够向storm定时器中添加"定时任务"。schedule函数定义以下:
schedule函数安全
(defnk schedule ;; timer绑定定时器,delay-secs绑定"定时任务"相对当前时间的延迟时间,afn绑定callback函数,check-active是;; 否须要检查定时器 [timer delay-secs afn :check-active true] ;; 检查定时器状态 (when check-active (check-active! timer)) (let [id (uuid) ^PriorityQueue queue (:queue timer)] ;; 加锁,执行时间=当前时间+延迟时间,将"定时任务"的vector类型数据添加到PriorityQueue队列中 (locking (:lock timer) (.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))
schedule-recurring函数定义以下:
chedule-recurring函数也很简单,与schedule函数的区别就是在"定时任务"的callback函数中又添加了一个相同的"定时任务"。schedule函数的语义能够理解成向定时器添加
一个"一次性任务",schedule-recurring函数的语义能够理解成向定时器添加"一个周期执行的定时任务"(开始执行时间=当前时间+延迟时间,而后每隔recur-secs执行一次)
schedule-recurring函数数据结构
(defn schedule-recurring [timer delay-secs recur-secs afn] (schedule timer delay-secs (fn this [] (afn) ; This avoids a race condition with cancel-timer. (schedule timer recur-secs this :check-active false))))
nimbus检查心跳和重分配任务的实现就是经过schedule-recurring函数向storm定时器添加了一个"周期任务"实现的。多线程
(schedule-recurring (:timer nimbus) 0 (conf NIMBUS-MONITOR-FREQ-SECS) (fn [] (when (conf NIMBUS-REASSIGN) (locking (:submit-lock nimbus) (mk-assignments nimbus))) (do-cleanup nimbus) ))