AsyncTask源码解析

前言

使用AsyncTask可以很容易的实如今子线程执行耗时操做,而后在主线程中更新进度,任务完成后能在主线程中收到结果,其提供了如下几个主要方,咱们先从一个示例开始java

  • onPreExecute 在子线程执行任务前被调用,主线程调用
  • doInBackground 在线程池中执行,子线程调用
  • onPostExecute 在子线程执行完毕后调用,若是Task被取消了将不会被调用
  • publishProgress 用于在doInBackground中调用触发onProgressUpdate,通常子线程调用
  • onProgressUpdate 由publishProgress触发

示例

假设应用程序须要进行更新,须要下载新版本APK,在下载过程当中须要实时更新进度,这时咱们就能够使用AsyncTask,代码以下,注意:须要在清单文件中加上网络和写文件权限android

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val pb = findViewById<ProgressBar>(R.id.pb)
        DownloadTask(pb).execute("http://down.360safe.com/360mse/360mse_nh00002.apk")
    }
}
class DownloadTask(pb: ProgressBar): AsyncTask<String, Int, Unit>() {
    private val mPb = WeakReference(pb)
    private val mTag = "download"
    override fun doInBackground(vararg params: String?) {
        val url = URL(params[0])
        val path = "${Environment.getExternalStorageDirectory()}/360safe.apk"
        val conn = url.openConnection() as HttpURLConnection
        if (conn.responseCode.toString().startsWith("20")) {
            var inputStream: InputStream? = null
            var outputStream: OutputStream? = null
            try {
                inputStream = conn.inputStream
                outputStream = BufferedOutputStream(FileOutputStream(File(path)))
                val totalSize = conn.contentLength
                val size = 1024
                val buffer = ByteArray(size)
                var downloadedSize = 0
                var length = inputStream.read(buffer)
                while (length != -1) {
                    downloadedSize += size
                    outputStream.write(buffer, 0, length)
                    onProgressUpdate(totalSize, downloadedSize)
                    length = inputStream.read(buffer)
                }
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                inputStream?.close()
                outputStream?.close()
            }
        }
    }
    override fun onProgressUpdate(vararg values: Int?) {
        mPb.get()?.max = values[0]!!
        mPb.get()?.progress = values[1]!!
    }
    override fun onPostExecute(result: Unit?) {
        Log.d(mTag, "下载完成")
    }
}
复制代码

这样就能在子线程下载APK,而后实时在主线程更新进度,下面看看AsyncTask的源码bash

源码

首先来看看AsyncTask的构造方法网络

public AsyncTask() {
    this((Looper) null);
}
@hide
public AsyncTask(@Nullable Handler handler) {
    this(handler != null ? handler.getLooper() : null);
}
@hide
public AsyncTask(@Nullable Looper callbackLooper) {
    mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
        ? getMainHandler()
        : new Handler(callbackLooper);
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}
复制代码

能够看出虽然有三个构造器可是咱们只能使用无参构造方法,其它两个构造方法都是隐藏的,其实参数不管是Handler也好Looper也好,最终只是想要一个Looper作为内部Handler的入参,这里因为callbackLooper为null,因此会调用getMainHandler并将结果赋值给mHandler,而后新建了mWorkermFuture,先来看看getMainLooperide

private static Handler getMainHandler() {
    synchronized (AsyncTask.class) {
        if (sHandler == null) {
            sHandler = new InternalHandler(Looper.getMainLooper());
        }
        return sHandler;
    }
}
复制代码

继续看看InternalHandleroop

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }
    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}
复制代码

这里能够看出当进度改变或者任务结束的时候都会发送消息过来在这里回调,而后看看AsyncTask的execute方法post

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;
    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
复制代码

又调用了executeOnExecutor将params和一个SerialExecutor实例传入ui

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
    mStatus = Status.RUNNING;
    onPreExecute();
    mWorker.mParams = params;
    exec.execute(mFuture);
    return this;
}
复制代码

该方法主要作了如下几件事情this

  1. 判断了当前Task的状态若是正在运行或者已经运行结束了就直接抛出异常
  2. 将当前Task的状态改成运行中,调用onPreExecute该方法是一个空方法交由子类实现能够在执行任务以前经过该方法作一些操做,
  3. 将参数赋值给mWorker.mParams
  4. mFuture丢给SerialExecutor进行执行
  5. 返回当前AsyncTask实例

接下来看看SerialExecutor.execute作了什么url

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;
    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
复制代码

首先往mTasks里面添加了一个Runnable实例,而后判断当前是否有任务在执行,若是没有就调用scheduleNext执行任务,该方法会从队列中取出第一个Runnable实例丢给THREAD_POOL_EXECUTOR进行执行,而当一个Runnable执行完毕后又会调用scheduleNext执行下一个Runnable,因此说同一时间只会有一个Runnable执行,下面来看看THREAD_POOL_EXECUTOR

public static final Executor THREAD_POOL_EXECUTOR;
static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);
    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);
复制代码

咱们看到THREAD_POOL_EXECUTOR是一个核心线程为2-4,最大线程数为处理器数*2+1,线程超时时间30秒,容许核心线程超时的线程池,那么咱们有个疑问刚才不是说是串行执行Runnable的吗?既然是一个个执行的搞个SingleThreadExecutor不就好了?答案是若是咱们调用execute方法确实是串行执行的,可是咱们也能够直接调用executeOnExecutor方法将THREAD_POOL_EXECUTOR当作入参这样咱们的Task就能够并行执行的了,下面代码调用到了mFuture.run,咱们直接来看看FutureTask的构造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
复制代码

这里的callable其实就是在AsyncTask的构造方法中建立的mWorker,这里将其赋值给了callable变量,而后将状态变动为NEW,而后来看看其run方法

public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
复制代码

首先判断当前状态是否是NEW不是就直接返回,而后调用mWorker.call,若是执行成功就调用set设置结果,执行失败就调用setException设置异常,先来看看mWorker.call

public Result call() throws Exception {
    mTaskInvoked.set(true);
    Result result = null;
    try {
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        result = doInBackground(mParams);
        Binder.flushPendingCommands();
    } catch (Throwable tr) {
        mCancelled.set(true);
        throw tr;
    } finally {
        postResult(result);
    }
    return result;
}
复制代码

该方法先将mTaskInvoked设置为true,代表该Task已经获得执行,而后设置优先级为后台线程,再而后调用doInBackground获取到结果,若是执行失败会将mCancelled设置为true,而后再抛出异常,最后调用postResult

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}
复制代码

该方法只是发了一个消息,根据前面讲到的InternalHandler的处理逻辑会调用finish

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}
复制代码

这里判断若是任务执行失败就调用onCancelled执行成功就调用onPostExecute,最后将状态置为FINISHED,咱们再回到FutureTask的run方法,还没分析执行call方法抛出异常的状况来看看setException

protected void setException(Throwable t) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = t;
        U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
复制代码

判断状态是不是NEW或者COMPLETING是的话就把当前的状态改为EXCEPTIONAL而后调用finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (U.compareAndSwapObject(this, WAITERS, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}
复制代码

该方法将阻塞队列状况而后调用done方法其实一个空实现,mFuture实现了它咱们来看看

mFuture = new FutureTask<Result>(mWorker) {
    @Override
    protected void done() {
        try {
            postResultIfNotInvoked(get());
        } catch (InterruptedException e) {
            android.util.Log.w(LOG_TAG, e);
        } catch (ExecutionException e) {
            throw new RuntimeException("An error occurred while executing doInBackground()",
                    e.getCause());
        } catch (CancellationException e) {
            postResultIfNotInvoked(null);
        }
    }
};
private void postResultIfNotInvoked(Result result) {
    final boolean wasTaskInvoked = mTaskInvoked.get();
    if (!wasTaskInvoked) {
        postResult(result);
    }
}
复制代码

若是Task还没获得执行就被取消掉了那么会调用postResult,到此为止基本流程已经走完了,可是还有一个方法在这个流程中没讲到那就是publishProgress

protected final void publishProgress(Progress... values) {
    if (!isCancelled()) {
        getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                new AsyncTaskResult<Progress>(this, values)).sendToTarget();
    }
}
public void handleMessage(Message msg) {
    AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
    switch (msg.what) {
        case MESSAGE_POST_RESULT:
            // There is only one result
            result.mTask.finish(result.mData[0]);
            break;
        case MESSAGE_POST_PROGRESS:
            result.mTask.onProgressUpdate(result.mData);
            break;
    }
}
复制代码

根据InternalHandler咱们能够看出实际上是回调了onPregressUpdate,至此所有流程都走完了,来总结一下

总结

  • AsyncTask内部切换线程的原理也是线程池加Handler
  • 默认状况下AsyncTask是串行执行的,也就是说假设你同时开启三个AsyncTask,它们会一个个执行,而不会同时执行要想作到并行执行须要手动调用executeOnExecutor
相关文章
相关标签/搜索