storm事件管理器定义在event.clj中,主要功能就是经过独立线程执行"事件处理函数"。咱们能够将"事件处理函数"添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取"事件处理函数"并执行。java
EventManager协议
协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例自己,相似于java中实例方法的第一个参数为this;协议相似于java中的接口。函数
(
defprotocol
EventManager
(
add
[
this
event-fn
])
(
waiting?
[
this
])
(
shutdown
[
this
]))
event-manager函数
(
defn
event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
;; daemon?表示是否将事件处理线程设置成守护线程
[
daemon?
]
;; added表示已添加的"事件处理函数"的个数
(
let
[
added (
atom
0)
;; processed表示已处理的"事件处理函数"的个数
processed (
atom
0)
;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
^
LinkedBlockingQueue
queue (
LinkedBlockingQueue.)
;; 设置事件管理器的状态为"running"
running (
atom
true)
;; 建立事件处理线程。Clojure函数实现了Runnable和Callable接口,因此能够将Clojure函数做为参数传递给java.lang.Thread类的构造函数
runner (
Thread.
;; 事件处理线程循环检查事件处理器的状态是不是"running",若是是,就从阻塞队列中获取"事件处理函数",并执行;而后将processed加1
(
fn
[]
(
try-cause
(
while
@
running
(
let
[
r (
.take
queue
)]
(
r)
(
swap!
processed
inc)))
(
catch
InterruptedException
t
(
log-message
"Event manager interrupted"))
(
catch
Throwable
t
(
log-error
t
"Error when processing event")
(
exit-process!
20
"Error when processing an event"
)))))]
(
.setDaemon
runner
daemon?)
;; 启动事件处理线程
(
.start
runner)
;; 返回一个实现了EventManager协议的实例
(
reify
EventManager
;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
(
add
[
this
event-fn
]
;; should keep track of total added and processed to know if this is finished yet
(
when-not
@
running
(
throw (
RuntimeException.
"Cannot add events to a shutdown event manager")))
(
swap!
added
inc)
(
.put
queue
event-fn))
;; waiting?判断事件处理线程是否处于等待状态
(
waiting?
[
this
]
(
or (
Time/isThreadWaiting
runner)
(
=
@
processed
@
added)))
;; 关闭事件管理器
(
shutdown
[
this
]
(
reset!
running
false)
(
.interrupt
runner)
(
.join
runner)))))