EventBus源码阅读(二)——Poster

  上一节在阅读了EventBus的消息发送后,停在了postToSubscription方法上:async

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

   在不一样的模式下,当前方法在不一样线程中调用,会使用不一样的Poster。上次对这一点没有深究,今天来粗略的看一看。ide

  从上面的代码中,咱们能够看到,除了直接执行方法 invokeSubscriber(subscription, event);外,还有几个特殊的Poster,分别是mainThreadPoster,backgroundPoster,和asyncPoster。oop

  咱们来一一阅读。post

  mainThreadPoster的类叫做HandlerPoster,代码不长,总共80来行。ui

  

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

  能够看到HandlerPoster继承了Handler。方法enqueue主要作了两件事。this

  1、将事件请求放入请求队列queue中。spa

  2、发送一个message到MainLooper中。线程

  每当HandlerPoster发送message,其中的handleMessage就要开始工做。EventBus为其设置了最大工做时间。在工做时间内,程序会不断地从queue中poll出请求,在主线程中执行它,直到queue队列为空,或是到达最大工做时间,才会结束。咱们能够看到,HandlerPoster其实就是对消息的发送作了处理,经过Handler,将其置于MainThread中来执行。code

 

  BackgroundPoster:blog

  BackgroundPoster则是实现了Runnable接口。

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

 

 

  BackgroundPoster也维护了一个请求队列。与HandlerPoster不一样的是,它再也不使用message来发动任务的执行。而是eventBus.getExecutorService().execute(this);这个方法最终能够追溯到EventBusBuilder中的DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();也就是说,在BackgroundPoster中执行的任务必定不在主线程。另外一个与HandlerPoster不一样的点是,BackgroundPoster没有最大工做时间,但它有个最大等待时间——1000ms。若是队列为空,在等待1000ms后,队列中依然没有新的事件加入,再表示事件请求已所有执行。

 

AsyncPoster:

  AsyncPoster的实现最为简单,它的功能是,不能当前处于什么线程,必定都会新开一个线程来执行这个任务。

  

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

 

   代码比较简单,咱们能够看出AsyncPoster是在BackgroundPoster上作的减法。去掉了线程保护(由于新开),去掉了wait(也是由于新开)。

 

  经过以上阅读,咱们大体对EventBus的事件发送机制有了了解。下一节,咱们将开始阅读,它的订阅机制。

 

Done~

相关文章
相关标签/搜索