上一节在阅读了EventBus的消息发送后,停在了postToSubscription方法上:async
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
在不一样的模式下,当前方法在不一样线程中调用,会使用不一样的Poster。上次对这一点没有深究,今天来粗略的看一看。ide
从上面的代码中,咱们能够看到,除了直接执行方法 invokeSubscriber(subscription, event);外,还有几个特殊的Poster,分别是mainThreadPoster,backgroundPoster,和asyncPoster。oop
咱们来一一阅读。post
mainThreadPoster的类叫做HandlerPoster,代码不长,总共80来行。ui
final class HandlerPoster extends Handler { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } }
能够看到HandlerPoster继承了Handler。方法enqueue主要作了两件事。this
1、将事件请求放入请求队列queue中。spa
2、发送一个message到MainLooper中。线程
每当HandlerPoster发送message,其中的handleMessage就要开始工做。EventBus为其设置了最大工做时间。在工做时间内,程序会不断地从queue中poll出请求,在主线程中执行它,直到queue队列为空,或是到达最大工做时间,才会结束。咱们能够看到,HandlerPoster其实就是对消息的发送作了处理,经过Handler,将其置于MainThread中来执行。code
BackgroundPoster:blog
BackgroundPoster则是实现了Runnable接口。
final class BackgroundPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } }
BackgroundPoster也维护了一个请求队列。与HandlerPoster不一样的是,它再也不使用message来发动任务的执行。而是eventBus.getExecutorService().execute(this);这个方法最终能够追溯到EventBusBuilder中的DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();也就是说,在BackgroundPoster中执行的任务必定不在主线程。另外一个与HandlerPoster不一样的点是,BackgroundPoster没有最大工做时间,但它有个最大等待时间——1000ms。若是队列为空,在等待1000ms后,队列中依然没有新的事件加入,再表示事件请求已所有执行。
AsyncPoster:
AsyncPoster的实现最为简单,它的功能是,不能当前处于什么线程,必定都会新开一个线程来执行这个任务。
class AsyncPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); } }
代码比较简单,咱们能够看出AsyncPoster是在BackgroundPoster上作的减法。去掉了线程保护(由于新开),去掉了wait(也是由于新开)。
经过以上阅读,咱们大体对EventBus的事件发送机制有了了解。下一节,咱们将开始阅读,它的订阅机制。
Done~