RxDownload2 源码解析(二)

源码解析,如需转载,请注明做者:Yuloran (t.cn/EGU6c76)java

前言

造轮子者:Season_zlc并发

本文主要讲述 RxDownload2 的线程调度异步

下载任务分发线程

顾名思义,就是分发下载任务的线程。该线程运行在 DownloadService 中,从业务上看,DownloadService应当仅被 start() & bind() 一次。任务分发线程,在 onBind() 时建立:ide

  1. start & bind service [-> RxDownload.java]
/** * start and bind service. * * @param callback Called when service connected. */
    private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
        Intent intent = new Intent(context, DownloadService.class);
        intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
        context.startService(intent);
        context.bindService(intent, new ServiceConnection() {
            @Override
            public void onServiceConnected(ComponentName name, IBinder binder) {
                DownloadService.DownloadBinder downloadBinder
                        = (DownloadService.DownloadBinder) binder;
                downloadService = downloadBinder.getService();
                context.unbindService(this);
                bound = true;
                callback.call();
            }

            @Override
            public void onServiceDisconnected(ComponentName name) {
                //注意!!这个方法只会在系统杀掉Service时才会调用!!
                bound = false;
            }
        }, Context.BIND_AUTO_CREATE);
    }
复制代码

上述代码有个细节,onServiceConnected() 中立刻调了 unbindService()post

  1. onBind [-> DownloadService.java]
@Nullable
    @Override
    public IBinder onBind(Intent intent) {
        log("bind Download Service");
        startDispatch();
        return mBinder;
    }
复制代码
  1. startDispatch() [-> DownloadService.java]
/** * start dispatch download queue. */
    private void startDispatch() {
        disposable = Observable
                .create(new ObservableOnSubscribe<DownloadMission>() {
                    @Override
                    public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
                        DownloadMission mission;
                        while (!emitter.isDisposed()) {
                            try {
                                log(WAITING_FOR_MISSION_COME);
                                mission = downloadQueue.take();
                                log(Constant.MISSION_COMING);
                            } catch (InterruptedException e) {
                                log("Interrupt blocking queue.");
                                continue;
                            }
                            emitter.onNext(mission);
                        }
                        emitter.onComplete();
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Consumer<DownloadMission>() {
                    @Override
                    public void accept(DownloadMission mission) throws Exception {
                        mission.start(semaphore);
                    }
                });
    }
复制代码
  • 上述代码就是下载任务分发线程的实现。其中 .subscribeOn(Schedulers.newThread()) 代表该线程经过 new Thread() 的方式产生的
  • 下载任务用一个阻塞队列来维护,阻塞队列内部已经实现了同步机制,因此无需担忧并发问题
  • 只要没有取消订阅,该线程就会不停的尝试从阻塞队列中获取下载任务并发射。若是队列为空,就会一直阻塞,直到有新任务入队
  • 上述代码是不严谨的,对 disposable 从新赋值前,没有先尝试对其取消订阅。若是屡次调用 bindService() ,就会出现线程泄露

下载任务执行线程

顾名思义,就是下载任务的执行线程。该线程运行在 Schedulers.io() 线程池上。入参信号量用来限制同时下载的最大任务数。ui

  1. start(final Semaphore semaphore) [-> SingleMission.java]
@Override
    public void start(final Semaphore semaphore) {
        disposable = start(bean, semaphore, new MissionCallback() {
            @Override
            public void start() {
                // 回调开始下载
                if (callback != null) callback.start();
            }

            @Override
            public void next(DownloadStatus value) {
                // 回调下载中
                status = value;
                processor.onNext(started(value));
                if (callback != null) callback.next(value);
            }

            @Override
            public void error(Throwable throwable) {
                // 回调下载失败
                processor.onNext(failed(status, throwable));
                if (callback != null) callback.error(throwable);
            }

            @Override
            public void complete() {
                // 回调下载完成
                processor.onNext(completed(status));
                if (callback != null) callback.complete();
            }
        });
    }
复制代码
  1. start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
protected Disposable start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) {
        return rxdownload.download(bean)
                .subscribeOn(Schedulers.io()) // 指定下载任务执行线程
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        if (canceled.get()) {
                            dispose(disposable);
                        }

                        log(TRY_TO_ACQUIRE_SEMAPHORE);
                        // 申请信号量
                        semaphore.acquire();
                        log(ACQUIRE_SUCCESS);
                        
                        // 得到信号量后,需再次检测是否已经暂停下载
                        if (canceled.get()) {
                            // 已经暂停,则取消订阅,释放信号量
                            dispose(disposable);
                        } else {
                            callback.start();
                        }
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        // 取消订阅时,须要释放信号量
                        semaphore.release();
                    }
                })
                .subscribe(new Consumer<DownloadStatus>() {
                    @Override
                    public void accept(DownloadStatus value) throws Exception {
                         // 回调下载进度
                        callback.next(value);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        // 回调下载失败
                        callback.error(throwable);
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        // 回调下载完成
                        callback.complete();
                    }
                });
    }
复制代码

下载任务中断线程

顾名思义,就是中断下载任务的线程。包括暂停、删除、所有暂停、所有取消四个操做。这些操做也运行在 Schedulers.io() 线程池上。this

  1. pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
/** * Pause download. * <p> * Pause a url or all tasks belonging to missionId. * * @param missionId url or missionId */
    public Observable<?> pauseServiceDownload(final String missionId) {
        // createGeneralObservable 是一个异步绑定下载服务的Observable,经过资源数为1的信号量实现强制同步
        return createGeneralObservable(new GeneralObservableCallback() {
            @Override
            public void call() {
                // 服务绑定后,调用服务的暂停下载
                downloadService.pauseDownload(missionId);
            }
        }).observeOn(AndroidSchedulers.mainThread());
    }
复制代码
  1. createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
/** * return general observable * * @param callback Called when observable created. * @return Observable */
    private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
        // 方法名起的很差,应该叫 bindService
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                if (!bound) {
                    // 由于 onServiceConnected 是异步回调的,因此这里用了个资源数为1的信号量实现强制同步(CountDownLatch也能够实现强制同步)
                    semaphore.acquire();
                    if (!bound) {
                        startBindServiceAndDo(new ServiceConnectedCallback() {
                            @Override
                            public void call() {
                                // 服务绑定后,回调 callback
                                doCall(callback, emitter);
                                // 释放信号量
                                semaphore.release();
                            }
                        });
                    } else {
                        doCall(callback, emitter);
                        semaphore.release();
                    }
                } else {
                    doCall(callback, emitter);
                }
            }
        }).subscribeOn(Schedulers.io()); // 指定在 io 线程执行,因此暂停下载也是在这个线程执行
    }
复制代码

同理,删除下载也会先调用 createGeneralObservable(),因此删除操做也是在 Schedulers.io() 上执行的。url

总结

  • 一个单独的线程用来分发下载任务:Schedulers.newThread()
  • 每次从线程池中取一个线程执行下载任务:Schedulers.io()
  • 每次从线程池中取一个线程执行暂停、删除下载任务:Schedulers.io()

RxDownload2 系列文章:spa

相关文章
相关标签/搜索