Kotlin | 深入理解协程

@[toc]

Kotlin协程

协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快
我们知道线程是CPU调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin中,Dispatcher(内部是线程池或者Android主线程)是调度器,可以调度协程运行在一个或多个线程之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:

  • 进程:线程 = 1:N
  • 线程:协程 = 1:N

协程优势:

  • 更安全的代码kotlin中提供了许多语言功能,避免Java中最常见的null空指针等异常
  • 语法简洁、富有表现力:相比于Javakotlin可以使用更少的代码实现更多的功能。
  • 可互操作:与Java语言无缝互通。即可以在 kotlin 代码中调用Java代码,同时也可以在Java代码中调用kotlin代码。kotlin代码本质上也是通过kotlin编译器编译后生成VM能识别的字节码。
  • 结构化并发使用看似阻塞式的写法来实现异步功能。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。

非阻塞式挂起

协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call) return 之外,协程添加了 suspend非阻塞式挂起 和 resume恢复:

  • suspend 用于暂停执行当前协程,并保存所有局部变量。
  • resume 用于让已挂起的协程从挂起处恢复执行。

注意suspend 函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function 的提示;如需调用 suspend 函数,只能从其他 suspend 函数进行调用,或通过使用协程构建器(例如 launch)来启动新的协程。
suspend 函数相比于普通函数内部多了一个 Continuation 续体实例(Kotlin转成Java代码之后即可看到),suspend函数中可以调用普通函数,但普通函数却不能调用suspend函数

 suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

suspend 修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java 中的Thread.sleep() 会阻塞当前线程,suspend 修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但主线程依然可以去执行协程之外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停(主线程依然是活跃的) -> 子线程执行完毕 -> 协程从挂起点恢复
如在上面的示例中,get() 在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get 会恢复已挂起的协程,继续执行show(result) 方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起
Kotlin 使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。

CPS变换 + Continuation续体 + 状态机

CPS变换 (Continuation-Passing-Style Transformation) 是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation 来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行
Continuation意为续体,只存在于挂起函数中。Kotlin 中当一个普通函数加上suspend 关键字之后,就成为了挂起函数,如:

private suspend fun suspendFuc() {}

我们对此函数进行反编译,查看对应的Java代码为:

private final Object suspendFuc(Continuation $completion) {
      return Unit.INSTANCE;
}

可以看到Java代码中系统帮我们多生成了一个Continuation 类型的参数,看下这个参数内部:

@SinceKotlin("1.3")
public interface Continuation {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result)
}

Continuation 是一个接口类型,表示在返回T类型值的挂起点之后的延续,其中类型T代表的是原来函数的返回值类型。resumeWith 恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。

  • 状态机

上述函数中加入一个delay 函数,delay 函数本身也是一个挂起函数:

    private suspend fun suspendFuc(): String {
        delay(1000)
        return "value"
    }

经过Tools -> Kotlin -> Show Kotlin Bytecode 反编译查看:

   private final Object suspendFuc(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof ) {
            $continuation = ()var1;
            if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
               (()$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

          //将要执行的Continuation逻辑传入ContinuationImpl中
         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;
             //invokeSuspend()会在恢复协程挂起点时调用
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineBaseFragment.this.suspendFuc(this);
            }
         };
      }
      Object $result = (()$continuation).result;
      //返回此状态意味着函数要被挂起
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

      // 状态机逻辑,通过label进行分块执行
      switch((()$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         (()$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      return "value";
   }

suspend挂起函数经过Kotlin编译器编译之后会进行CPS变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1,上述函数逻辑中,通过switch(label){} 状态机将逻辑分块执行,首先label = 0,此时会先将label置为1,接着调用了delay挂起函数,返回Intrinsics.COROUTINE_SUSPENDED意味着函数要被挂起;
当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result) 方法恢复执行,$result是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。
此时label变为1ResultKt.throwOnFailure($result)中通过 if (value is Result.Failure) throw value.exception判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break,执行到最后的return "value" 逻辑,这样整个方法的流程就运行完毕了。
总结:suspend挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机来运转的

CoroutineContext

我们知道 Android 中的 Context(子类有Application、Activity、Service)可以获取应用资源,可以启动ActivityService等。CoroutineContext意为协程上下文,其作用可以类比 Context,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称等。

public interface CoroutineContext {
    //从该上下文中返回具有给定[key]的元素 或 null 
    public operator fun  get(key: Key): E?
    //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
    public fun  fold(initial: R, operation: (R, Element) -> R): R
    //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
    //删除带有指定[key]的元素。
    public fun minusKey(key: Key<*>): CoroutineContext
    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key
    //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>
        public override operator fun  get(key: Key): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null
        public override fun  fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)
        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

协程总是会运行在 CoroutineContext 类型的上下文中,其中plus操作符被重写,多个CoroutineContext相加时,会生成一个CombinedContext( 内部可以理解成一个Element都不同的链表);同时 CombinedContext拥有 map索引能力,集合中的每个元素都有一个唯一的 Key
#### 继承关系

Element 定义在CoroutineContext内部,是它的内部接口。CoroutineContext 使用以下元素集定义协程的行为:

  • Job:控制协程的生命周期。
  • CoroutineDispatcher:将工作分派到适当的线程。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineExceptionHandler:处理未捕获的异常。

下面就来详细看看几种元素的使用。

CoroutineContext几种具体实现

1、Job & SupervisorJob

Job是协程的句柄。使用launchasync创建的协程都会返回一个Job实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:

val job1 = GlobalScope.launch { }
val job2 = MainScope().launch { }
override fun onDestroy() {
    super.onDestroy()
    job1.cancel()
    job2.cancel()
}

注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel中使用viewModelScopeLifecycleOwner中使用lifecycleScope,可以在各自的生命周期中自动关闭协程。
Job常用的API

  val job = GlobalScope.launch(
      context = Job() + Dispatchers.Main,
      start = CoroutineStart.LAZY) { // 逻辑部分
   }

  -  job.start()  // 对应 start = CoroutineStart.LAZY
  -  job.cancelAndJoin()  //cancel() + join()
  -  job.cancel() // 取消
  -  job.isActive // 协程是否存活
  -  job.isCancelled // 协程是否被取消
  -  job.isCompleted //协程是否已经执行完毕

Job 有以下状态:

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

状态流转:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

SupervisorJob的使用

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
GlobalScope.launch(exceptionHandler) {
   //子Job1
    launch(SupervisorJob()) {
        delay(200)
        throw NullPointerException()
     }
   //子Job2
    launch {
        delay(300)
        log("child2 execute successfully")
    }
    //子Job3
    launch {
        delay(400)
        log("child3 execute successfully")
    }
    log("parent execute successfully")
}

注意子Job1传入的是 SupervisorJob,当发生异常时,兄弟协程(job2/job3)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
2022-08-30 23:31:30.446  E/TTT: child2 execute successfully
2022-08-30 23:31:30.545  E/TTT: child3 execute successfully

如果将job1中的更换为默认时,执行结果为:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException

可以看到job1中使用默认设置时,当job1发生了异常,兄弟协程也会被取消。注意这里job2/job3可以被取消是因为delay()中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1使用默认配置,job3也不会被取消:

//子Job3
 launch(Dispatchers.IO) {
     Thread.sleep(400)
     log("child3 execute successfully")
 }

job3中没有了对协程状态的判断,所以即使job3被通知要取消协程了,依然会继续执行直到结束。
SuperviorJob源码浅析

public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

当子协程 job1/job2/job3 启动时,会和协程本身的 Job 形成父子关系。而当子协程抛异常时,父JobchildCancelled() 方法会被执行,且默认返回的是 true,表示取消 父Job 及其所有 子Job;如果 子Job中使用的是 SuperviorJobchildCancelled() 返回的是 false,表示 父Job及其未发生异常的 子Job都不会受影响。

2、CoroutineDispatcher
public actual object Dispatchers {
   @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

将工作任务分派到适当的线程。

  • Dispatchers.Main:运行在UI线程中。Dispatchers.Main.immediate: 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过Handler转发到主线程
  • Dispatchers.IO: 执行IO密集型任务,最多提交max(64, CPU核心数)个任务执行。
  • Dispatchers.DEFAULT :执行CPU密集型任务, CoroutineScheduler最多有corePoolSize个线程被创建,corePoolSize的取值为max(2, CPU核心数),即它会尽量的等于CPU核心数
  • Dispatchers.Unconfined:不给协程指定运行的线程,由启动协程的线程决定;但当被挂起后, 会由恢复协程的线程继续执行。内部通过ThreadLocal保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。
    withContext

withContext() 可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext() 来确保每个函数都是主线程安全的,这意味着,可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。

3、CoroutineName

协程的名称,可用于调试。

4、CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {

    public companion object Key : CoroutineContext.Key
    //当有未处理的异常发生时,该方法就会执行
    public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash,可以通过自定义CoroutineExceptionHandler来拦截异常,如下:

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) { // do something }              

自定义CoroutineExceptionHandler调用的是如下方法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

所以当发生异常时,会在handleException()中执行我们传入的handler(CoroutineContext, Throwable)方法。
CoroutineExceptionHandler使用示例

 val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
 lifecycleScope.launch(exceptionHandler) {
     log("parent execute start")
     val childExHandler =
                    CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
     //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
     val childJob = launch(childExHandler) {
         delay(1000)
         log("child execute")
         throw  IllegalArgumentException("error occur")
     }
     childJob.join()
     log("parent execute end")
 }

执行结果:

2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur

可见虽然在子协程中也自定义了CoroutineExceptionHandler,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job(),则异常会在子协程中处理而不会往上传播:

 //其他内容不变
 val childJob = launch(childExHandler + SupervisorJob()) {
     //其他内容不变
 }

执行结果:

2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end

可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。
:如果启动协程的是async,则CoroutineExceptionHandler中的handler方法并不会马上执行,必须调用deffered.await()时才会执行。

CoroutineScope 协程作用域

CoroutineScope 会跟踪它使用 launchasync 创建的所有协程。CoroutineScope 本身并不运行协程,但是通过CoroutineScope可以保证对协程进行统一管理,避免发生内存泄漏等,
常见的CoroutineScope

  • GlobalScope: 全局协程作用域,如果不主动通过job.cancel()关闭,其生命周期与Application一致。
  • MainScope: MainScope的内部:public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main),可以看到MainScope其实是通过SupervisorJob() + Dispatchers.Main自定义的协程作用域,其内部运行在UI线程且内部异常不会往上传播。
  • runBlocking: 顶层函数,和 GlobalScope 不一样,它会阻塞当前线程直到其内部所有子协程执行结束;
  • ktx扩展库中的自定义GlobalScope:典型应用就是在LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)使用的lifecycleScope,以及ViewModel中使用的ViewModelScope,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。

参考

【1】官网:利用 Kotlin 协程提升应用性能
【2】官网:Android 上的 Kotlin 协程
【3】Kotlin挂起函数原理
【4】揭秘Kotlin协程中的CoroutineContext

文章来源于互联网:Kotlin | 深入理解协程

阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.dandroid.cn/archives/19807,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?