源码解析,如需转载,请注明做者:Yuloran (t.cn/EGU6c76)java
造轮子者:Season_zlc并发
本文主要讲述 RxDownload2
的线程调度异步
顾名思义,就是分发下载任务的线程。该线程运行在 DownloadService
中,从业务上看,DownloadService
应当仅被 start() & bind()
一次。任务分发线程,在 onBind()
时建立:ide
/** * start and bind service. * * @param callback Called when service connected. */
private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
Intent intent = new Intent(context, DownloadService.class);
intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
context.startService(intent);
context.bindService(intent, new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder binder) {
DownloadService.DownloadBinder downloadBinder
= (DownloadService.DownloadBinder) binder;
downloadService = downloadBinder.getService();
context.unbindService(this);
bound = true;
callback.call();
}
@Override
public void onServiceDisconnected(ComponentName name) {
//注意!!这个方法只会在系统杀掉Service时才会调用!!
bound = false;
}
}, Context.BIND_AUTO_CREATE);
}
复制代码
上述代码有个细节,onServiceConnected()
中立刻调了 unbindService()
。post
@Nullable
@Override
public IBinder onBind(Intent intent) {
log("bind Download Service");
startDispatch();
return mBinder;
}
复制代码
/** * start dispatch download queue. */
private void startDispatch() {
disposable = Observable
.create(new ObservableOnSubscribe<DownloadMission>() {
@Override
public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
DownloadMission mission;
while (!emitter.isDisposed()) {
try {
log(WAITING_FOR_MISSION_COME);
mission = downloadQueue.take();
log(Constant.MISSION_COMING);
} catch (InterruptedException e) {
log("Interrupt blocking queue.");
continue;
}
emitter.onNext(mission);
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<DownloadMission>() {
@Override
public void accept(DownloadMission mission) throws Exception {
mission.start(semaphore);
}
});
}
复制代码
.subscribeOn(Schedulers.newThread())
代表该线程经过 new Thread() 的方式产生的disposable
从新赋值前,没有先尝试对其取消订阅。若是屡次调用 bindService()
,就会出现线程泄露顾名思义,就是下载任务的执行线程。该线程运行在 Schedulers.io()
线程池上。入参信号量用来限制同时下载的最大任务数。ui
@Override
public void start(final Semaphore semaphore) {
disposable = start(bean, semaphore, new MissionCallback() {
@Override
public void start() {
// 回调开始下载
if (callback != null) callback.start();
}
@Override
public void next(DownloadStatus value) {
// 回调下载中
status = value;
processor.onNext(started(value));
if (callback != null) callback.next(value);
}
@Override
public void error(Throwable throwable) {
// 回调下载失败
processor.onNext(failed(status, throwable));
if (callback != null) callback.error(throwable);
}
@Override
public void complete() {
// 回调下载完成
processor.onNext(completed(status));
if (callback != null) callback.complete();
}
});
}
复制代码
protected Disposable start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) {
return rxdownload.download(bean)
.subscribeOn(Schedulers.io()) // 指定下载任务执行线程
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (canceled.get()) {
dispose(disposable);
}
log(TRY_TO_ACQUIRE_SEMAPHORE);
// 申请信号量
semaphore.acquire();
log(ACQUIRE_SUCCESS);
// 得到信号量后,需再次检测是否已经暂停下载
if (canceled.get()) {
// 已经暂停,则取消订阅,释放信号量
dispose(disposable);
} else {
callback.start();
}
}
}, new Action() {
@Override
public void run() throws Exception {
// 取消订阅时,须要释放信号量
semaphore.release();
}
})
.subscribe(new Consumer<DownloadStatus>() {
@Override
public void accept(DownloadStatus value) throws Exception {
// 回调下载进度
callback.next(value);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 回调下载失败
callback.error(throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
// 回调下载完成
callback.complete();
}
});
}
复制代码
顾名思义,就是中断下载任务的线程。包括暂停、删除、所有暂停、所有取消四个操做。这些操做也运行在 Schedulers.io()
线程池上。this
/** * Pause download. * <p> * Pause a url or all tasks belonging to missionId. * * @param missionId url or missionId */
public Observable<?> pauseServiceDownload(final String missionId) {
// createGeneralObservable 是一个异步绑定下载服务的Observable,经过资源数为1的信号量实现强制同步
return createGeneralObservable(new GeneralObservableCallback() {
@Override
public void call() {
// 服务绑定后,调用服务的暂停下载
downloadService.pauseDownload(missionId);
}
}).observeOn(AndroidSchedulers.mainThread());
}
复制代码
/** * return general observable * * @param callback Called when observable created. * @return Observable */
private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
// 方法名起的很差,应该叫 bindService
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
if (!bound) {
// 由于 onServiceConnected 是异步回调的,因此这里用了个资源数为1的信号量实现强制同步(CountDownLatch也能够实现强制同步)
semaphore.acquire();
if (!bound) {
startBindServiceAndDo(new ServiceConnectedCallback() {
@Override
public void call() {
// 服务绑定后,回调 callback
doCall(callback, emitter);
// 释放信号量
semaphore.release();
}
});
} else {
doCall(callback, emitter);
semaphore.release();
}
} else {
doCall(callback, emitter);
}
}
}).subscribeOn(Schedulers.io()); // 指定在 io 线程执行,因此暂停下载也是在这个线程执行
}
复制代码
同理,删除下载也会先调用 createGeneralObservable()
,因此删除操做也是在 Schedulers.io()
上执行的。url
Schedulers.newThread()
Schedulers.io()
Schedulers.io()
RxDownload2 系列文章:spa