本文是系列文章《Android和iOS开发中的异步处理》的第四篇。在本篇文章中,咱们主要讨论在客户端编程中常用的队列结构,它的异步编程方式以及相关的接口设计问题。java
前几天,有位同事跑过来一块儿讨论一个技术问题。状况是这样的,他最近在开发一款手游,用户在客户端上的每次操做都须要向服务器同步数据。原本按照传统的网络请求处理方式,用户发起操做后,须要等待操做完成,这时界面要显示一个请求等待的过程(好比转菊花)。当请求完成了,客户端显示层才更新,用户也才能发起下一个操做。可是,这个游戏要求用户能在短期内连续作不少操做。若是每一个操做都要经历一个请求等待的过程,无疑体验是很糟糕的。android
其实呢,这里就须要一个操做任务队列。用户不用等待一个操做完成,而是只要把操做放入队列里,就能够继续进行下一步操做了。只是,当队列中有操做出错时,须要进入一个统一的错误处理流程。固然,服务器也要配合进行一些处理,好比要更加慎重地对待操做去重问题。git
本文要讨论的就是跟队列的设计和实现有关的那些问题。程序员
注:本系列文章中出现的代码已经整理到GitHub上(持续更新),代码库地址为:github
其中,当前这篇文章中出现的Java代码,位于com.zhangtielei.demos.async.programming.queueing这个package中。编程
在客户端编程中,使用队列的场景实际上是不少的。这里咱们列举其中几个。安全
为了讨论方便,咱们把这种对一系列操做进行排队,并具有必定失败重试能力的队列称为“任务队列”。服务器
下面本文分三个章节来讨论异步任务和任务队列的相关话题。微信
在多线程的环境下,提到队列就不能不提TSQ。它是一个很经典的工具,在不一样的线程之间提供了一条有序传输数据的通道。它的结构图以下所示。markdown
消费者和生产者分属不一样的线程,这样消费者和生产者才能解耦,生产不至于被消费所阻塞。若是把TSQ用于任务队列,那么生产至关于用户的操做产生了任务,消费至关于任务的启动和执行。
消费者线程运行在一个循环当中,它不停地尝试从队列里取数据,若是没有数据,则阻塞在队列头上。这种阻塞操做须要依赖操做系统的一些原语。
利用队列进行解耦,是一个很重要的思想。说远一点,TSQ的思想推广到进程之间,就至关于在分布式系统里常用的Message Queue。它对于异构服务之间的解耦,以及屏蔽不一样服务之间的性能差别,能够起到关键做用。
而TSQ在客户端编程中比较少见,缘由包括:
咱们在这里提到TSQ,主要是由于它比较经典,也可以和其它方式作一个对比。咱们在这里就不给出它的源码演示了,想了解细节的同窗能够参见GitHub。GitHub上的演示代码使用了JDK中现成的TSQ的实现:LinkedBlockingQueue。
如上图所示,生产者和消费者都运行在一个线程,即主线程。按照这种思路来实现任务队列,咱们须要执行的任务自己必须是异步的,不然整个队列的任务就无法异步化。
咱们定义要执行的异步任务的接口以下:
public interface Task { /** * 惟一标识当前任务的ID * @return */ String getTaskId(); /** * 因为任务是异步任务, 那么start方法被调用只是启动任务; * 任务完成后会回调TaskListener. * * 注: start方法需在主线程上执行. */ void start(); /** * 设置回调监听. * @param listener */ void setListener(TaskListener listener); /** * 异步任务回调接口. */ interface TaskListener { /** * 当前任务完成的回调. * @param task */ void taskComplete(Task task); /** * 当前任务执行失败的回调. * @param task * @param cause 失败缘由 */ void taskFailed(Task task, Throwable cause); } }复制代码
因为Task
是一个异步任务,因此咱们为它定义了一个回调接口TaskListener
。
getTaskId
是为了获得一个能惟一标识当前任务的ID,便于对不一样任务进行精确区分。
另外,为了更通用的表达失败缘由,咱们这里选用一个Throwable对象来表达(注:在实际编程中这未必是一个值得效仿的作法,具体状况请具体分析)。
有人可能会说:这里把Task
接口定义成异步的,那若是想执行一个同步的任务该怎么办?这其实很好办。把同步任务改形成异步任务是很简单的,有不少种方法(反过来却很难)。
任务队列的接口,定义以下:
public interface TaskQueue { /** * 向队列中添加一个任务. * @param task */ void addTask(Task task); /** * 设置监听器. * @param listener */ void setListener(TaskQueueListener listener); /** * 销毁队列. * 注: 队列在最后不用的时候, 应该主动销毁它. */ void destroy(); /** * 任务队列对外监听接口. */ interface TaskQueueListener { /** * 任务完成的回调. * @param task */ void taskComplete(Task task); /** * 任务最终失败的回调. * @param task * @param cause 失败缘由 */ void taskFailed(Task task, Throwable cause); } }复制代码
任务队列TaskQueue
自己的操做也是异步的,addTask
只是将任务放入队列,至于它何时完成(或失败),调用者须要监听TaskQueueListener
接口。
须要注意的一点是,TaskQueueListener
的taskFailed
,与前面TaskListener
的taskFailed
不一样,它表示任务在通过必定次数的失败后,最终放弃重试从而最终失败。然后者只表示那个任务一次执行失败。
咱们重点讨论TaskQueue
的实现,而Task
的实现咱们这里不关心,咱们只关心它的接口。TaskQueue
的实现代码以下:
public class CallbackBasedTaskQueue implements TaskQueue, Task.TaskListener { private static final String TAG = "TaskQueue"; /** * Task排队的队列. 不须要thread-safe */ private Queue<Task> taskQueue = new LinkedList<Task>(); private TaskQueueListener listener; private boolean stopped; /** * 一个任务最多重试次数. * 重试次数超过MAX_RETRIES, 任务则最终失败. */ private static final int MAX_RETRIES = 3; /** * 当前任务的执行次数记录(当尝试超过MAX_RETRIES时就最终失败) */ private int runCount; @Override public void addTask(Task task) { //新任务加入队列 taskQueue.offer(task); task.setListener(this); if (taskQueue.size() == 1 && !stopped) { //当前是第一个排队任务, 当即执行它 launchNextTask(); } } @Override public void setListener(TaskQueueListener listener) { this.listener = listener; } @Override public void destroy() { stopped = true; } private void launchNextTask() { //取当前队列头的任务, 但不出队列 Task task = taskQueue.peek(); if (task == null) { //impossible case Log.e(TAG, "impossible: NO task in queue, unexpected!"); return; } Log.d(TAG, "start task (" + task.getTaskId() + ")"); task.start(); runCount = 1; } @Override public void taskComplete(Task task) { Log.d(TAG, "task (" + task.getTaskId() + ") complete"); finishTask(task, null); } @Override public void taskFailed(Task task, Throwable error) { if (runCount < MAX_RETRIES && !stopped) { //能够继续尝试 Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount); task.start(); runCount++; } else { //最终失败 Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: " + runCount); finishTask(task, error); } } /** * 一个任务最终结束(成功或最终失败)后的处理 * @param task * @param error */ private void finishTask(Task task, Throwable error) { //回调 if (listener != null && !stopped) { try { if (error == null) { listener.taskComplete(task); } else { listener.taskFailed(task, error); } } catch (Throwable e) { Log.e(TAG, "", e); } } task.setListener(null); //出队列 taskQueue.poll(); //启动队列下一个任务 if (taskQueue.size() > 0 && !stopped) { launchNextTask(); } } }复制代码
在这个实现中,咱们须要注意的几点是:
offer
, peek
, take
)都运行在主线程,因此队列数据结构再也不须要线程安全。咱们选择了LinkedList的实现。addTask
的时候,若是原来队列为空(当前任务是第一个任务),那么启动它;MAX_RETRIES
,才算最终失败。runCount
记录了当前任务的累计执行次数。CallbackBasedTaskQueue
的代码揭示了任务队列的基本实现模式。
任务队列对于失败任务的重试策略,大大提升了最终成功的几率。在GitHub上的演示程序中,我把Task
的失败几率设置得很高(高达80%),在重试3次的配置下,当任务执行的时候仍然有比较大的几率能最终执行成功。
关于RxJava到底有什么用?网上有不少讨论。
有人说,RxJava就是为了异步。这个固然没错,但说得不具体。
也有人说,RxJava的真正好处就是它提供的各类lift变换。还有人说,RxJava最大的用处是它的Schedulers机制,可以方便地切换线程。其实这些都不是革命性的关键因素。
那关键的是什么呢?我我的认为,是它对于回调接口设计产生的根本性的影响:它消除了为每一个异步接口单独定义回调接口的必要性。
这里立刻就有一个例子。咱们使用RxJava对TaskQueue
接口从新进行改写。
public interface TaskQueue { /** * 向队列中添加一个任务. * * @param task * @param <R> 异步任务执行完要返回的数据类型. * @return 一个Observable. 调用者经过这个Observable获取异步任务执行结果. */ <R> Observable<R> addTask(Task<R> task); /** * 销毁队列. * 注: 队列在最后不用的时候, 应该主动销毁它. */ void destroy(); }复制代码
咱们仔细看一看这个修改后的TaskQueue
接口定义。
TaskQueueListener
没有了。addTask
原来没有返回值,如今返回了一个Observable。调用者拿到这个Observable,而后去订阅它(subscribe),就能得到任务执行结果(成功或失败)。这里的改动很关键。原本addTask
什么也不返回,要想得到结果必须监听一个回调接口,这是典型的异步任务的运做方式。但这里返回一个Observable以后,让它感受上很是相似一个同步接口了。说得再抽象一点,这个Observable是咱们站在当下对于将来的一个指代,原本尚未运行的、发生在将来的虚无缥缈的任务,这时候有一个实实在在的东西被咱们抓在手里了。并且咱们还能对它在当下就进行不少操做,并能够和其它Observable结合。这是这一思想真正的强大之处。相应地,Task
接口原本也是一个异步接口,天然也能够用这种方式进行修改:
/** * 异步任务接口定义. * * 再也不使用TaskListener传递回调, 而是使用Observable. * * @param <R> 异步任务执行完要返回的数据类型. */ public interface Task <R> { /** * 惟一标识当前任务的ID * @return */ String getTaskId(); /** * * 启动任务. * * 注: start方法需在主线程上执行. * * @return 一个Observable. 调用者经过这个Observable获取异步任务执行结果. */ Observable<R> start(); }复制代码
这里把改成RxJava的接口讨论清楚了,具体的队列实现反而不重要了。具体实现代码就不在这里讨论了,想了解详情的同窗仍是参见GitHub。注意GitHub的实现中用到了一个小技巧:把一个异步的任务封装成Observable,咱们可使用AsyncOnSubscribe。
咱们在文章开头讲述了TSQ,并指出它在客户端编程中不多被使用。但并非说在客户端环境中TSQ就没有存在的意义。
实际上,客户端的Run Loop(即Android的Looper)自己就是一个TSQ,要否则它也无法在不一样线程之间安全地传递消息和调度任务。正是由于客户端有了一个Run Loop,咱们才有可能使用无锁的方式来实现任务队列。因此说,咱们在客户端的编程,老是与TSQ有着千丝万缕的联系。
顺便说一句,Android中的android.os.Looper,最终会依赖Linux内核中大名鼎鼎的epoll事件机制。
本文的核心任务是要讲解任务队列的异步编程方式,因此忽略了一些设计细节。若是你要实现一个生产环境能使用的任务队列,可能还须要考虑如下这些点:
本文最后运用了RxJava对任务队列进行了重写。咱们确实将接口简化了许多,省去了回调接口的设计,也让调用者能用统一的方式来处理异步任务。
可是,咱们也须要注意到RxJava带来的一些问题:
addTask
中,而是有所延迟,延迟到调用者的subscribe开始执行后。并且其执行线程环境有可能受到调用者对于Schedulers的设置的影响(好比经过subscribeOn),有不在主线程执行的风险。考虑到RxJava带来的这些问题,若是我要实现一个完整功能的任务队列或者其它复杂的异步任务,特别是要把它开源出来的的时候,我有可能不会让它对RxJava产生绝对的依赖。而是有可能像Retrofit那样,同时支持本身的轻量的异步机制和RxJava。
在本文结束以前,我再提出一个有趣的开放性问题。本文GitHub上给出的代码大量使用了匿名类(至关于Java 8的lambda表达式),这会致使对象之间的引用关系变得复杂。那么,对于这些对象的引用关系的分析,会是一个颇有趣的话题。好比,这些引用关系开始是如何随着程序执行创建起来的,最终销毁的时候又是如何解除的?有没有内存泄露呢?欢迎留言讨论。
在下一篇,咱们将讨论有关异步任务更复杂的一个问题:异步任务的取消。
(完)
其它精选文章: