AsyncTask的使用
- AsyncTask是对Thread和Handler的封装,它在线程池中执行后台任务,然后把任务的进度和最终结果传递给UI线程处理。
- AsyncTask并不适合非常耗时的任务(也就几分钟),如果是非常耗时的操作,还是使用Executor等线程池。
- AsyncTask 是一个抽象的类,有三个泛型,Params、Progress、Result。Params表示传入参数的类型,Progress表示后台任务执行的进度类型,Rsult表示后台任务的返回结果的类型,如果不需要传递具体类型,可以传递Void。
四个核心的方法。
- onPreExecute(),在UI线程中执行,在异步任务执行之前调用。
- doInBackground(Params…params),在线程池中执行,用于执行异步任务。调用publishProgress方法,会调用onProgressUpdate方法。
- onProgressUpdate(Progress…values)方法在UI线程调用,一般用做更新进度条。
- onPostExecute(Result result),result是doInBackground的返回值,在Ui线程执行。
源码分析
从execute()方法进入
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
这里调用的是executeOnExecutor传入了sDefaultExecutor(默认线程池),和我们的参数params。
sDefaultExecutor 变量
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//串行线程池
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
默认的线程池–sDefaultExecutor 是由SerialExecutor赋值,而SerialExecutor是一个串行线程池。
private static class SerialExecutor implements Executor {
//双端队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
//往双端队列队尾插入元素。
mTasks.offer(new Runnable() {
public void run() {
try {
//串行机制
r.run();
} finally {
//从队头开始执行
scheduleNext();
}
}
});
// 第一次入队列时mActivie为空,因此需要手动调用scheduleNext方法
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//从双端队列队头中取出,执行execute
if ((mActive = mTasks.poll()) != null) {
// /最终执行在并行线程池中
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/**
* An {@link Executor} that can be used to execute tasks in parallel.
* 并行线程池
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
SerialExecutor实现了Executor的接口,mTasks是一个存储Runnable的双端队列,在execute()方法中,先往mTasks中插入Runnable元素,第一次进入队列时,mActivity为null,所以需要手动调用scheduleNext方法。
而scheduleNext方法,则是将插入的Runnable对象从队列中取出并传给THREAD_POOL_EXECUTOR并行线程池来执行该Runnable对象,然后利用try finally机制,先执行完try代码块中的代码,在执行完毕之后必然再执行finally代码块中的代码,然后以此往复循环,达到串行运行的目的,当然线程的具体执行还是再并行线程池中进行的。
同时,因为是静态的成员变量,会被所有实例共享,这也就造成了,整个进程中的所有AsyncTask都是串行执行。这个需要注意。
举个例子
//
new MyAsyncTask().execute("MyAsyncTask 1");
new MyAsyncTask().execute("MyAsyncTask 2");
new MyAsyncTask().execute("MyAsyncTask 3");
new MyAsyncTask().execute("MyAsyncTask 4");
new MyAsyncTask().execute("MyAsyncTask 5");
class MyAsyncTask extends AsyncTask<String,Integer,String>{
@Override
protected String doInBackground(String... strings) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return strings[0];
}
@Override
protected void onPostExecute(String s) {
super.onPostExecute(s);
SimpleDateFormat dateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Log.d("AAA",s+" 执行结束 :"+dateFormat.format(new Date()));
}
}
//====================== 执行结果 ====================
MyAsyncTask 1 执行结束 :2017-11-08 10:37:29
MyAsyncTask 2 执行结束 :2017-11-08 10:37:31
MyAsyncTask 3 执行结束 :2017-11-08 10:37:33
MyAsyncTask 4 执行结束 :2017-11-08 10:37:35
MyAsyncTask 5 执行结束 :2017-11-08 10:37:37
枚举Status
public enum Status {
/**
* Indicates that the task has not been executed yet.
*表明尚未执行的任务。
*/
PENDING,
/**
* Indicates that the task is running.
*表明任务正在执行。
*/
RUNNING,
/**
* Indicates that {@link AsyncTask#onPostExecute} has finished.
*表明任务已经结束。
*/
FINISHED,
}
在executeOnExecutor方法中,先判断了当前的状态是不是不等于PENDING,如果不是就会抛异常,这也说明了为什么一个对象只能execute一次,把状态改为RUNNING,开始执行onPreExecute()方法。
将我们传入的params赋值给mWorker对象,再把mFuture给sDefaultExecutor串行线程池串行执行,这里也是比较绕的一部分。
首先,我们先看mWorker对象。
在构造方法中
public AsyncTask(@Nullable Looper callbackLooper) {
...
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//当前任务已经被调用了,
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//执行doInBackground函数
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
...
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
mWoker是一个实现Callable接口WorkerRunnable对象,将传入的params赋值给mWorker对象的mParams,在call方法中,先将mTaskInvoked设置为true,表示当前任务已经被调用,然后执行doInBackground(mParams),最后在finally中将返回的结果result交给postResult执行。
//通过handler 发送消息
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// 调用AsyncTask的finish方法。
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
//如果被取消,则调用onCancelled
onCancelled(result);
} else {
//否则调用onPostExecute
onPostExecute(result);
}
//更改状态
mStatus = Status.FINISHED;
}
这里面其实差不多就跑通了整个AsyncTask的关键方法的调用逻辑了,通过handler发送消息,然后执行onPostExecute。
问题是,mWorker的call方法是什么时候调用的 ?
mFuture对象
private final FutureTask<Result> mFuture;
public AsyncTask(@Nullable Looper callbackLooper) {
...
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
FutureTask 是一个并发类,它实现了RunnableFuture接口,而RunnableFuture又实现了
Runnable, Future接口。
public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
在构造方法中,将mWokder传了进去,其实是给内部的callable赋值。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
当我们调用execute方法时–最终调用了 exec.execute(mFuture)。
这里时把mFuture做为Runnable的作用来执行它的run()方法。
public void run() {
...
try {
//成员变量callable,在构造方法中赋值了
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在执行mFuture的run方法时,就会执行mWorker的call方法,接着就巴拉巴拉完成了整个操作。
这里说一下mFuture的done方法是什么时候执行的。
调用run方法如果成功执行,最终都会执行到done方法。
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
/**
* 删除和发信号给所有等待结果的线程
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//在这里执行done()方法。
done();
callable = null;
}
在这done方法中,AsyncTask做了一个保险操作。
如果mWorker call方法失败了,就会来到这里,同样调用了postResult方法
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
}
}
}
private void postResultIfNotInvoked(Result result) {
//
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
还有一个publishProgress方法,其实也是通过handler来实现的。
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
至此,AsyncTask的工作原理大致分析完毕。
这里面还涉及双端队列ArrayDeque,线程池以及并发类FeatureTask都是值得深究和学习的。
这里还学到了招,如何串行线程池。
抽一下代码
public class SerialExecutor {
private Runnable mActive;
private ArrayDeque<Runnable> mArrayDeque = new ArrayDeque<>();
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingDeque<>(128);
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Serial thread #" + mCount.getAndIncrement());
}
};
private static final ThreadPoolExecutor THREAD_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
public synchronized void execute(final Runnable r) {
mArrayDeque.offer(new Runnable() {
@Override
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
// 第一次入队列时mActivie为空,因此需要手动调用scheduleNext方法
if (mActive == null) {
scheduleNext();
}
}
private void scheduleNext() {
if ((mActive = mArrayDeque.poll()) != null) {
THREAD_EXECUTOR.execute(mActive);
}
}
}
测试一下
public static void main(String[] args) {
SerialExecutor serialExecutor = new SerialExecutor();
for (int i = 0; i < 5; i ++) {
final int j = i;
serialExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("The num is :" + (j + 1));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
执行结果
The num is :1
The num is :2
The num is :3
The num is :4
The num is :5
完美。。。