Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Kotlin 协程教程

什么是协程

协程就像非常轻量级的线程。线程是由系统调度的,线程切换或线程阻塞的开销都比较大。而协程依赖于线程,但是协程挂起时不需要阻塞线程,几乎是无代价的,协程是由开发者控制的。所以协程也像用户态的线程,非常轻量级,一个线程中可以创建任意个协程。

协程的创建

线程的创建方式主要有两种, 继承Thread, 或者实现Runnable接口, 而协程而是通过构建器(coroutine builder方法), 有点类似工厂方法, 调用一个静态工厂方法就可以返回一个协程.
coroutine builder 方法有: launch,async,runBlocking等, 就像运行线程时, 给它指定一个线程池, 放在那种线程池中运行. 协程运行也需要指定一个运行环境和条件, 这里专业的名称是: 协程范围(CoroutineScope), 上面提到的构建器方法都是定义在它上的扩展函数. 它包含一个上下文(coroutineContext), 上下文是保存协程运行时需要的一些基础信息, 如名字, 调度器, 子协程等, 它的是类似 Map 的数据结构.

使用 构建器 创建协程, 注意构建器的使用方法:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // 创建并启动一个新的协程
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主线程中的代码会立即执行
    runBlocking {     // 启动一个协程, 但是这个表达式阻塞了主线程
        delay(2000L)  // 我们延迟 2 秒来保证 JVM 的存活
    }
}
Hello,
World!

等待与取消

线程通常是全局的, 运行时没有父子关系, 子线程完全独立于父线程运行. 如果要等待子线程运行结束, 需要调用它的join方法. 协程与之类似, 通过构建器方法创建并运行一个协程后, 返回一个job, 可以在这个job上调用join,cancel方法来等待或取消协程的运行.
另外注意调用了 runBlocking 的主线程会一直 阻塞 直到 runBlocking 内部的协程执行完毕。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancel() // 取消该作业
    job.join() // 等待作业执行结束
    println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

在Java 中, 一个执行长时间计算任务的线程, 如果没有周期性地检查一个是否取消执行的 Flag 变量, 那么它无法取消执行的, Kotlin 中协程也是这样的. 如果要使协程是可取消的, 那么必须要精心地设计, 如周期性地(如 for循环中)检查它是否存活的标记(可以认为是isActive变量, 其实它是一个可以被使用在CoroutineScope 中的扩展属性), 示例如下:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // 可以被取消的计算循环
            // 每秒打印消息两次
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 等待一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该作业并等待它结束
    println("main: Now I can quit.")
}

通常有这样一种需求, 启动一个大任务, 它是由许多小任务组成的, 当其中任何一个小任务失败或取消时, 取消其他所有的小任务以及大任务本身. 如果使用上面所说的手动join显然会很麻烦, 而且容易出错. 在协程中, 可以使用 结构化并发 这种机制来满足这种需求. 使用 作用域构建器 来构建一个协程运行的 作用域 , 所有在它内部创建或发起的子协程都从属于这个范围, 在所有已启动子协程执行完毕之前它不会结束.

以下代码演示了, 使用 作用域构建器 来构建一个自定义的 作用域 , 而不是协程构建器(coroutine builder)创建的默认作用域.

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope 默认创建的作用域.
    launch {
        delay(200L)
        println("Task from runBlocking")
    }

    coroutineScope { // 显式地创建一个协程作用域
        launch {
            delay(500L)
            println("Task from nested launch")
        }

        delay(100L)
        println("Task from coroutine scope") // 这一行会在内嵌 launch 之前输出
    }

    println("Coroutine scope is over") // 这一行在内嵌 launch 执行完毕后才输出
}

// 输出:
// Task from coroutine scope
// Task from runBlocking
// Task from nested launch
// Coroutine scope is over

注意 : runBlockingcoroutineScope 的主要区别在于后者在等待所有子协程执行完毕时不会阻塞当前线程。runBlocking一般使用于测试代码中, 是为了方便写测试代码而设计的一个特殊方法.

挂起函数

可以把挂起函数理解为在协程内部调用的异步函数, 它带有suspend关键字.

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

// 这是你的第一个挂起函数
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

清理

通常, 当协程被取消时, 需要做一些清理工作, 此时, 可以把协程中运行的代码用try {...} fininaly {...}块包住, 这样当协程被取消时, 会执行fininaly块中的清理工作. 但是fininaly 块中不能直接调用挂起函数, 否则会抛出CancellationException异常, 因为它已经被取消了, 而你又要在fininaly块中执行挂起函数把它挂起, 显然与要求矛盾. 然而, 现实的需求无奇不有, 如果非要这么做, 也不是不可以, 当你需要挂起一个被取消的协程,你可以将相应的代码包装在withContext(NonCancellable) {...} 中,并使用 withContext函数以及NonCancellable上下文,见如下示例所示:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {  // 重点注意这里
                println("job: I'm running finally")
                delay(1000L) // 这里调用了挂起函数!
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该作业并等待它结束
    println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

超时

有时需要监控一个协程运行了指定的时间后还没有结束就报告超时异常, 这时可以使用withTimeout()方法, 如:

import kotlinx.coroutines.*

fun main() = runBlocking {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

指定1300毫秒后, withTimeout方法抛出了TimeoutCancellationException,它是CancellationException 的子类。如果不想抛出异常, 而只是想得到一个指示运行状态标志, 可以使用withTimeoutOrNull, 它不会抛出异常, 而是返回null.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // 在它运行得到结果之前取消它
    }
    println("Result is $result")
}

async 并发

上面使用的launch创建协程只能运行一个任务, 返回的是Job, 而不是业务相关的返回值. 为此, 可以使用async协程构建器, 它返回一个Deferred —— 一个轻量级的非阻塞future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await() 在一个延迟的值上得到它的最终结果, 因为 Deferred 继承自 Job,所以如果需要的话,你可以取消它。

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")    
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了些有用的事
    return 29
}

async 可以通过将 start 参数设置为 CoroutineStart.LAZY 而变为延迟启动的。 在这个模式下,只有结果通过 await 获取的时候或者在 Job 的 start 函数调用的时候协程才会启动。运行下面的示例:

val time = measureTimeMillis {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    // 执行一些计算
    one.start() // 启动第一个
    two.start() // 启动第二个
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

async 风格的函数

为了简化客户端调用, 可以定义 async 风格的函数, 这样客户端不用管协程相关的概念, 它可以像调用普通函数一样调用异步风格的函数, 异步风格的函数就是一个普通函数:

// somethingUsefulOneAsync 函数的返回值类型是 Deferred<Int>
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

这个函数不是挂起函数。它可以在任何地方使用。 然而,它们总是在调用它们的代码中意味着异步(这里的意思是 并发 )执行。而客户端需要在之后的某个时间去取函数的计算结果, 而某个时间是无法确定的, 所以问题就来了, 应该怎么样取返回值呢? 如下所示:

// 我们可以在协程外面调用异步风格的函数
val one = somethingUsefulOneAsync()
// 但是必须启动一个协程来取结果: 调用它的 await() 方法
// 当我们等待结果的时候,这里我们使用 runBlocking { …… } 来阻塞当前线程
runBlocking {
    println("The answer is ${one.await()}")
}

async 的结构化并发

上一节中只有一个异步方法, 如果有多个, 并且有依赖关系的函数如何组成一个大函数呢? 如下:

import kotlinx.coroutines.*
import kotlin.system.*

// 注意,在这个示例中我们在 `main` 函数的右边没有加上 `runBlocking`
fun main() {
    val time = measureTimeMillis {
        // 我们可以在协程外面启动异步执行
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // 但是等待结果必须调用其它的挂起或者阻塞
        // 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了些有用的事
    return 29
}

这样的话, 会有潜在的问题, 如somethingUsefulOneAsync()方法内部有异常时, 是无法得到一个正确的和的, 所以当其中任何一个失败后, 应该取消另一个协程的计算. 这就是所谓的 结构化并发 .

suspend fun concurrentSum(): Int = coroutineScope { //注意: 这里必须创建了一个新的作用域
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

这种情况下,如果在concurrentSum函数内部发生了错误,并且它抛出了一个异常,所有在作用域中启动的协程都会被取消。于是这个函数可以在协程构建器块内使用了.

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        println("The answer is ${concurrentSum()}")
    }
    println("Completed in $time ms")    
}

调度器与线程

协程上下文包含一个协程调度器 (CoroutineDispatcher), 它确定了协程真正在哪些线程执行。所有的协程构建器诸如 launchasync 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch { // 运行在父协程的上下文中,即 runBlocking 主协程
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // 将会获取默认调度器
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }    
}

Dispatchers.Unconfined 协程调度器在调用它的线程启动了一个协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复回哪个线程执行,这完全由被调用的挂起函数来决定。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) { // 非受限的——将和主线程一起工作
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // 父协程的上下文,主 runBlocking 协程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }    
}
Unconfined      : I'm working in thread main // 挂起前在调用者线程中立即执行, 这是其特点, 不延迟, 不切换线程, 有时有这种需求
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor // 可以看出恢复后在另一个线程中执行
main runBlocking: After delay in thread main

线程切换

在协程中可以使用withContext方法来切换线程, 在的线程之间跳转运行:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
            newSingleThreadContext("Ctx2").use { ctx2 ->
                    runBlocking(ctx1) {
                        log("Started in ctx1")
                        withContext(ctx2) {
                            log("Working in ctx2")
                        }
                        log("Back to ctx1")
                    }
            }
    }    
}

这里使用runBlocking, 并且给它指定了一个上下文, 来创建了一个协程, 并且另一个使用withContext函数来改变协程的上下文,而仍然驻留在相同的协程中,正如可以在下面的输出中所见到的:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

注意,在这个例子中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候, 它使用了 Kotlin 标准库中的 use 函数来释放该线程。或者使用close方法来手动释放.

子协程

协程之前有父子关系, 在结构化并发中讲过, 如没有显式指定上下文, 子协程继承父协程的上下文, 父协程的取消会导致整个范围内的子协程都取消. 但是如果使用全局范围来创建协程, 这样的协程是没有父协程的, 它不受其他协程取消的影响, 如下:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        // 孵化了两个子作业, 其中一个通过 GlobalScope 启动, 它不受外围上下文协程取消的影响!!
        GlobalScope.launch {
            println("job1: I run in GlobalScope and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // 另一个则使用父协程的上下文
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // 取消请求(request)的执行
    delay(1000) // 延迟一秒钟来看看发生了什么
    println("main: Who has survived request cancellation?")
}
job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

配置 CoroutineContext

协程上下文重载了加号操作符, 可以通过加号来方便地组合指定上下文的元素:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }    
}

如这些指定了协程上下文的调度器和名字.

与 Android Activity 生命周期绑定

  • Activity 实现 CoroutineScope 接口, 真正的实现代理给CoroutineScope(Dispatchers.Default).
  • 在 Activity 的 destroy() 方法里取消所有协程.
  • Activity 里启动的协程承袭同一个 CoroutineScope, 所以可以一起取消.
import kotlin.coroutines.*
import kotlinx.coroutines.*

class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun destroy() {
        cancel() // Extension on CoroutineScope
    }

    // class Activity continues
    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

fun main() = runBlocking<Unit> {
    val activity = Activity()
    activity.doSomething() // 运行测试函数
    println("Launched coroutines")
    delay(500L) // 延迟半秒钟
    println("Destroying activity!")
    activity.destroy() // 取消所有的协程
    delay(1000) // 为了在视觉上确认它们没有工作    
}
Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

异常处理

协程被取消的时候会在挂起点抛出CancellationException,但所有的异常处理器忽略这个异常, 即不处理它. 异常的传播有两种方式: 自动传播, 默认处理, 或者 由用户处理 . 使用协程构建器launchactor创建协程时, 如果运行时抛出异常, 异常会自动向上层传播, 最终到默认的处理器处理. 类似 Java 的 Thread.uncaughtExceptionHandler来处理 . 另一种方式是 由用户处理 , 比如说,通过 tryawaitreceive 的调用.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = GlobalScope.launch {
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // 由自动传播到 Thread.defaultUncaughtExceptionHandler 处理, 会在控制台打印出异常信息
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 await 时处理
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}
Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException

如果要重新定义协程的全局异常处理器, 而不是使用默认的呢? JVM 平台中 通过 ServiceLoader 为所有的协程注册一个 CoroutineExceptionHandler作为全局的异常处理器, 它与Thread.defaultUncaughtExceptionHandler类似. 而在 Android 中, 则是通过注册Thread.uncaughtExceptionPreHandler实现的.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    val job = GlobalScope.launch(handler) {
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) {
        throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
    }
    joinAll(job, deferred)    
}
Caught java.lang.AssertionError

这里特别注意: launch构建的协程使用了指定的异常处理器来处理异常了, 而调用async时, 虽然指定了异常处理器handler, 但是它并没有处理异常! 为什么会这样? 这是因为CoroutineExceptionHandler仅在异常期望不是由用户来处理时, 它才处理异常, 而async是一个期望由用户来处理异常的方法, 另外它的异常是当客户端调用await()时才会抛出, 并且用户代码需要用try来捕获, 这样异常才会被用户代码处理!

取消与异常紧密相关。协程内部使用 CancellationException 来进行取消,这个异常会被所有的异常处理器忽略,如果你catch这个异常, 它应该仅仅用来调试. 当一个协程被job.cancel()取消, 而没有传入表示原因的cause参数时, 它将终止, 但不会取消它的父协程. 如果一个协程遇到CancellationException以外的异常, 那么它的父协程将会因为异常而取消, 并且在所有子协程终止后, 父协程来处理这个异常. 这种行为不能被覆盖, 它是结构化并发的基础.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    val job = GlobalScope.launch(handler) {
        launch { // the first child
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // the second child
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()    
}
Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
Caught java.lang.ArithmeticException

以上实例可以看出, 第2个协程抛出一个异常导致第1个协程取消了, 但最终这个异常在所有子协程终止后由父协程的异常处理器捕获并处理.

监督 (Supervision)

在结构化并发中, 如果一个协程遇到CancellationException以外的异常, 那么它的父协程及其子协程都取消。这可以满足大多数需求, 但如果要求, 一个子协程因异常而取消, 不要级联到父协程及其兄弟协程呢?

监督作业(Supervision job)

防止单个协程因异常而取消导致的传播, 这就是监督作业(SupervisorJob)的作用, 它类似于常规的 Job,唯一的不同是:SupervisorJob 的取消只会向下传播。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("First child is failing")
            throw AssertionError("First child is cancelled")
        }
        // 启动第两个子作业
        val secondChild = launch {
            firstChild.join()
            // 取消了第一个子作业且没有传播给第二个子作业
            println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // 但是取消了监督的传播
                println("Second child is cancelled because supervisor is cancelled")
            }
        }
        // 等待直到第一个子作业失败且执行完成
        firstChild.join()
        println("Cancelling supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}
First child is failing
First child is cancelled: true, but second one is still active
Cancelling supervisor
Second child is cancelled because supervisor is cancelled

以上输出说明: 第1个协程的异常仅导致其自身取消, 第2个协程不受影响, 继续运行, 当supervisor取消时它才结束.

监督作用域(Supervision scope)

对于限定作用域的并发,监督作用域(supervisorScope) 可以被用来替代 coroutineScope 来实现相同的目的。它只会单向的传播并且当它自身执行失败的时候, 它将所有的子协程全部取消。它也会在所有的子作业执行结束前等待, 就像 coroutineScope 所做的那样。

import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
    try {
        supervisorScope {
            val child = launch {
                try {
                    println("Child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    println("Child is cancelled")
                }
            }
            // 使用 yield 来给我们的子作业一个机会来执行打印
            yield()
            println("Throwing exception from scope")
            throw AssertionError() // 这里人造一个异常使它自身失败
        }
    } catch(e: AssertionError) {
        println("Caught assertion error")
    }
}
Child is sleeping
Throwing exception from scope
Child is cancelled
Caught assertion error

监督协程中的异常

特别注意, 由于监督协程的失败, 不会向上传播, 所以每个子协程应该自己处理异常, 这是与普通协程的重要区别.

import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    supervisorScope {
        val child = launch(handler) { // 注意: 子协程需自己处理异常
            println("Child throws an exception")
            throw AssertionError()
        }
        println("Scope is completing")
    }
    println("Scope is completed")
}
Scope is completing
Child throws an exception
Caught java.lang.AssertionError
Scope is completed

共享可变状态与并发

协程可用多线程调度器(比如默认的 Dispatchers.Default)并发执行。这样就会有多线程中的并发问题, 其中解决方案与多线程领域中的解决方案类似, 但有一些并不一样.

问题

我们启动一百个协程,它们都做一千次相同的操作。我们同时会测量它们的完成时间以便进一步的比较:

import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

显示结果并不是期望的打印 Counter = 100000, 因为多线程读写共享变量没有同步.

volatile 无济于事

@Volatile // 在 Kotlin 中 `volatile` 是一个注解
var counter = 0

线程安全的数据结构

var counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

这是针对此类特定问题的最快解决方案。它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作。

以细粒度限制线程

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            withContext(counterContext) {  // 注意: 将每次自增限制在单线程上下文中
                counter++
            }
        }
    }
    println("Counter = $counter")
}

以粗粒度限制线程

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // 将一切都限制在单线程上下文中, 对比上一节, 观察有什么不同
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

这段代码比上一节的运行更快, 因为它不需要从多线程 Dispatchers.Default 上下文切换到单线程上下文。

互斥

该问题的互斥解决方案:使用永远不会同时执行的 关键代码块 来保护共享状态的所有修改。在阻塞的世界中,你通常会为此目的使用 synchronized 或者 ReentrantLock。 在协程中的替代品叫做 Mutex 。它具有 lockunlock 方法, 可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。

还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式:

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 用锁保护每次自增
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

此示例中锁是细粒度的,因此会付出一些性能代价。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment