【Android】异步加载 AsyncTask 源码解析
: May 27, 2016
: Android
: onlylemi/notes/tree/master/Android
轻量级的异步加载框架。内部实现是以线程池的形式进行。
AsyncTask
官方文档:https://developer.android.com/reference/android/os/AsyncTask.html
翻译:https://github.com/onlylemi/android-source-code-translate/blob/master/android/os/AsyncTask.java
基本用法
它是一个抽象类,需要继承它,同时定义三个参数
声明
public abstract class AsyncTask<Params, Progress, Result>{
}
- Params —— 执行时要传入的参数类型(例如:下载是传个url地址)
- Progress —— 后台任务执行中进度的类型(例如:传个Integer作为进度值)
- Result —— 异步任务执行结束后的返回值类型(例如:下载图片时返回Bitmap)
基本方法
- onPreExecute() —— 在执行异步任务之前运行在主线程,在
executeOnExecutor()
方法中被调用 - doInBackground(Params… p) —— 异步任务执行的子线程中运行,在
mWorker
对象的call()
中被调用 - onProgressUpdate(Progress… p) ——
这个方法默认不会调用
一般在doInBackground()
方法中调用publishProgress(Progress... p)
方法,通过消息机制回调到该方法。
以下这两个方法只会被调用一个,任务正常完成调用第一个,如果中途被取消,调用第二个
- onPostExecute(Result… r) —— 该方法在异步任务执行完之后调用,运行在主线程中
- onCancelled() —— 当任务被 cancale(true) 的时候
以上四个方法在代码中都不要自己去调用。
示例
下载一张网络图片为例,进行说明。源代码可查看 https://github.com/onlylemi/AndroidTest/tree/master/Test2_AsyncTask,这里展示主要部分。
主Activity
public class MainActivity extends AppCompatActivity {
private ImageView imageView;
private TextView textView;
private MyAsyncTask asyncTask;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 显示下载后的图片
imageView = (ImageView) findViewById(R.id.inage_view);
// 显示图片下载的进度
textView = (TextView) findViewById(R.id.text_view);
String urlStr = "http://ww2.sinaimg.cn/large/610dc034gw1f4hvgpjjapj20ia0ur0vr.jpg";
asyncTask = new MyAsyncTask(imageView, textView);
asyncTask.execute(urlStr);
}
}
MyAsyncTask类
public class MyAsyncTask extends AsyncTask<String, Integer, Bitmap> {
private static final String TAG = MyAsyncTask.class.getSimpleName();
private ImageView imageView;
private TextView textView;
public MyAsyncTask(ImageView imageView, TextView textView) {
this.imageView = imageView;
this.textView = textView;
}
@Override
protected void onPreExecute() {
super.onPreExecute();
textView.setText("0");
Log.i(TAG, "onPreExecute");
}
@Override
protected Bitmap doInBackground(String... params) {
Log.i(TAG, "doInBackground");
String urlStr = params[0];
Bitmap bitmap = null;
InputStream in = null;
ByteArrayOutputStream out = null;
HttpURLConnection conn = null;
try {
out = new ByteArrayOutputStream();
URL url = new URL(urlStr);
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(3000);
conn.connect();
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
in = conn.getInputStream();
//bitmap = BitmapFactory.decodeStream(in);
int allLength = conn.getContentLength();
int currentLength = 0;
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) != -1) {
out.write(buffer, 0, len);
currentLength += len;
// 设置进度,这个方法执行后会执行 onProgressUpdate(Integer... values)
publishProgress(currentLength / allLength * 100);
}
bitmap = BitmapFactory.decodeByteArray(out.toByteArray(), 0, allLength);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (null != in) {
in.close();
}
if (null != out) {
out.close();
}
if (null != conn) {
conn.disconnect();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return bitmap;
}
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
Log.i(TAG, "onProgressUpdate");
// 更新进度
if (null != textView) {
textView.setText(values[0].toString());
}
}
@Override
protected void onPostExecute(Bitmap bitmap) {
super.onPostExecute(bitmap);
Log.i(TAG, "onPostExecute");
// 设置图片
if (null != imageView && null != bitmap) {
imageView.setImageBitmap(bitmap);
}
}
@Override
protected void onCancelled(Bitmap bitmap) {
Log.i(TAG, "onCancelled");
}
}
以上就是 AsyncTask 的一个简单用法,接下来看他的内部实现。
源码解析
当我们在使用时,先 new 一个 MyAsyncTask
的引用时,会调用 AsyncTask
的构造函数
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
在构造函数中就初始化了一个 mWorker(WorkerRunnable),其实 WorkerRunnable 就是一个 Callable,但是 mParams 并没有被初始化,同时初始化了一个 mFuture(FutureTask) 对象,并把 mWorker 作为参数。
// 主线程中就是通过这个方法启动异步任务进行执行,
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
// 默认传递 sDefaultExecutor,它是一个 SERIAL_EXECUTOR,表示串行执行
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
// 设置为运行状态
mStatus = Status.RUNNING;
// 调用了第一个方法,初始准备
onPreExecute();
// 这是才把参数设置给 mWorker 对象(初始化时没有设置)
mWorker.mParams = params;
// 将 FutureTask 交给 SerialExecutor 的 execute 方法
exec.execute(mFuture);
return this;
}
以上可以看出,有个枚举类型的 Status,表示当前任务的一个执行状态,execute() 只能被调用一次,被再次调用回报异常
- PENDING —— 待定状态。默认为此状态,
mStatus = Status.PENDING
- RUNNING —— 运行状态。调用了
execute
方法之后,在executeOnExecutor
方法中设置为此状态- FINISHED —— 完成状态。在任务完成之后,handler发送消息,调用
finish
方法中设置
程序中调用 exec.execute(mFuture);
之后,就开始执行任务,由于 exec 是SERIAL_EXECUTOR,也就是一个 SerialExecutor
类对象,看这个类实现
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
可以看到,先把 FutureTask 插入到任务队列 tasks 中执行,如果这个时候没有正在活动的 AsyncTask 任务,那么就会执行下一个 AsyncTask 任务,同时当一个 AsyncTask 任务执行完毕之后,AsyncTask 会继续执行其他任务直到所有任务都被执行为止。从这里就可以看出,默认情况下,AsyncTask是串行执行的。这样就会调用 FutureTask 的 run() 方法,然后其中又会调用 mWork 的 call() 方法,也就是在构造函数中的实现,call() 方法中就会调用 doInBackground() 并把结果给了 postResult(result)。
关于 SerialExecutor,是串行执行的,按加入的顺序依次执行。容量没有限制
- 里面有一个Runnable队列:mTask
- 两个方法execute(runnable) 和 scheduleNext(),都是加了同步锁的
- 每执行完execute()后会调用scheduleNext()方法
- scheduleNext()方法如果发现mTask还不为null,则调用execute()方法执行。
- SerialExecutor类在执行的时候仍然是在调用ThreadPoolExecutor来执行Runnable
private Result postResult(Result result) {
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
可以看到这个方法通过获取一个 handler 去发送消息 MESSAGE_POST_RESULT。InternalHandler 类中重写了 handleMessage() 方法来处理消息,调用到 finish() 方法,判断当前任务是否被取消,没有取消就会调用 onPostExecute(result) 方法了;如果被取消,则可以通过重写 onCancelled() 方法,执行相关的操作。这样整个任务也就执行结束了。
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
而 MESSAGE_POST_PROGRESS 消息是在,用户调用了 publishProgress() 之后才调用的,这样也就调用了 onProgressUpdate() 方法,来更新任务进度。
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
关于调用cancel()方法
- 它会调用内部FutureTask对象的cancel()方法
- 并不会立即终止异步任务的执行
- 异步任务会在doInBackground()方法执行完后结束,不去执行postExecute(),而执行onCancle()
FutureTask
可以取消的异步运算。
public class FutureTask<V> implements RunnableFuture<V>{}
public interface RunnableFuture<V> extends Runnable, Future<V>{}
public interface Future<V>{}
public interface Runnable<V>{}
- 它实现了两个接口:Runnable 和 Future
- 两个构造方法:
- FutureTask(Callable
callable) - FutureTask(Runnable runnable, V result)
- FutureTask(Callable
- 既可以作为Runnable,被线程执行
- 又可以作为Future, 得到Callable的值
由于FutureTask也实现了Runnable接口所以它可以提交给Executor来执行,或者用它来作为new Thread(Runnable)的参数。
public interface Runnable {
public void run();
}
public interface Callable<V> {
V call() throws Exception;
}
Callable 和 Runnable
- Callable 有返回值,抛出异常
- Runnable 无返回值,不抛出异常
THREAD_POOL_EXECUTOR
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
- 继承了Executor
- 每次并发执行5个线程
- 线程池中最多允许128个线程,多了抛出非法状态异常。
Thread_Pool_Executor是并行执行的,每次执行5个线程,超过五个的放在等待队列中。队列长度不能超过128个。
如果我的文章对您有所帮助,就请我喝杯咖啡吧^^
Messages