android协程是怎么切换线程的

要搞清楚这个问题,我觉得需要搞懂这几个知识点:

  1. Continuation,简单讲可以把它看成是Callback,回调。当协程调用suspend函数,协程会被挂起,当suspend函数执行完成后,会通过Continuation的resumeWith方法,将执行结果返回给协程让协程继续执行。
  2. ContinuationInterceptor顾名思义是Continuation拦截器,也就是Callback拦截器,它的作用就是让回调在指定的线程中执行。假设有这样一个场景,在主线程中开启协程,在子线程中执行耗时操作,当耗时操作执行完毕时,它需要执行回调方法,而回调是需要在主线程中执行的,协程框架内部在协程开启的时候就会通过拦截器将回调与主线程绑定在一起,让回调一定在主线程中执行
  3. CoroutineDispatcher是拦截器的子类,除了拥有拦截器的功能之外,它还有两个重要作用,isDispatchNeeded(context:CoroutineContext)决定回调是否需要分发到其它线程中执行, dispatch(context: CoroutineContext, block: Runnable)将回调分发到指定线程中执行
  4. ThreadContextElement处理协程中的线程的ThreadLocal相关的变量。ThreadLocal很好理解,就是线程私有变量,只能被当前线程访问。那么协程中为什么会有这ThreadLocal呢?举个例子,在同一个线程中执行两个协程,将协程命名为A、B,在执行协程时打印出协程的名称。因为是同一个线程,而且协程的名称是存储在ThreadLocal中的。所以在协程执行的时候,需要将ThreadLocal中保存协程名称的变量修改为当前协程的名称,协程执行完毕时,将变量重置。

首先通过一个例子来讲解Continuation

fun main() = runBlocking(Dispatchers.Main) { // 花括号中是Continuation
    suspendNoChangeThread()
    suspendChangeToIOThread()
    normalFunc()
}

fun normalFunc() {
    // do something
}

suspend fun suspendNoChangeThread() {
    suspendCoroutine<Unit> {
        it.resume(Unit)
    }
}

suspend fun suspendChangeToIOThread(): String {
    return withContext(Dispatchers.IO) {
        Thread.sleep(1000)
        return@withContext "OK"
    }
}

我们聚焦到main函数。我们可以把整个函数体当成Continuation

// 命名为namedContinuation避免混淆
val namedContinuation = {
    suspendNoChangeThread()
    suspendChangeToIOThread()
    normalFunc()
}

整个函数都是在runBlocking(Dispatchers.Main)中的Dispatchers.Main主线程中执行的。我们看到函数体中有两个suspend的函数,而且suspendChangeToIOThread是切换到IO线程执行的,那么当它执行完,会紧接着执行normalFunc()方法,而该方法是需要在主线程中调用的。那么说明在suspendChangeToIOThread()和normalFunc()之间有一次切换线程的过程。

我们都知道suspend修饰的函数,编译器会加上Continuation参数的。所以我们可以把suspendChangeToIOThread()等价成:

fun suspendChangeToIOThread(continuation:Continuation):String {
    return withContext(Dispatchers.IO) {
            Thread.sleep(1000)
            return@withContext "OK"
        }
}

continuation需要在主线程中执行,那么continuation参数是什么时候与主线程绑在一起的呢?我们需要跟踪runBlocking的调用链。最终会调用到Callable.kt中的startCoroutineCancellable方法

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

intercepted()方法就是把一个continuation回调和回调所需要执行的线程绑定在一起

//ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

以上👆代码,通过协程上下文获取到ContinuationInterceptor,调用interceptContinuation(this)方法,生成DispatchedContinuation,很显然DispatchedContinuation是一个回调的同时,还通过CoroutineDispatcher指明回调在哪个线程中执行。

// CoroutineDispatcher.kt

 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    

less

// DispatchedContinuation.kt

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) 

接下来我们通过isDispatchNeeded和dispatch方法是如何实现回调在指定线程中执行。

isDispatchNeeded判断是否有必要切换线程。假设当前在执行的函数所在的线程与回调目标线程是同一个线程,那就没必要切线程了,否则是需要切线程的。

我们以Android中的主线程为例,讲解

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
}

HandlerContext的功能就是协程Android库将线程切换到Handler所在的线程。一个特例,切换到主线程执行。

从isDispatchNeeded方法中我们可以看到,当前线程与目标线程不同时需要切换线程(Looper.myLooper() != handler.looper)。

通过dispatch方法,我们看到只需要执行handler.post(block)方法,就能把block切到指定线程中执行。

最后我们再简单讲解下withContext是如何实现线程切换的。

fun main() = runBlocking {
    withContext(Dispatchers.IO) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    println("runBlocking" + coroutineContext[Job])
}

withContext内部会创建一个新的协程,而且会阻塞外部的协程。只有内部的协程执行完成后,外部的协程才会执行。所以我们看到的打印日志是 先打印withContext 然后再打印runBlocking。

看下源码,此处需要理解协程上下文知识,具体可以看下我之前写的文章

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context

        val newContext = oldContext.newCoroutineContext(context)
        
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
           // 省略代码
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) 
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            // 省略代码
        }
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

withContext源码将线程切换分成三种情况:

  1. 无需切换线程,而且协程上下文都没改变(newContext === oldContext)
  2. 无需切换线程,但是协程上下文其它的内容发现变化,比如说CorotineName发生变化了。会启动UndispatchedCoroutine,该协程会更新ThreadContextElement,因为线程没发生变化,只需要改变ThreadLocal中的内容即可
  3. 需要切换线程。启动DispatchedCoroutine,block.startCoroutineCancellable方法则最终会调用到intercepted(),后续流程与上文相同。

下面代码分别对应上述3个case:

fun main() = runBlocking {
    // case1 
    withContext(EmptyCoroutineContext) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }

    // case2
    withContext(CoroutineName("newName")) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }

    // case3
    withContext(Dispatchers.IO) {
        println("withContext " + coroutineContext[Job])
        delay(1000)
    }
    println("runBlocking" + coroutineContext[Job])
}
阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.dandroid.cn/archives/20220,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?