iT邦幫忙

2021 iThome 鐵人賽

DAY 14
0
Software Development

Coroutine 停看聽系列 第 14

Day14:內建的 suspend 函式,好函式不用嗎? (3)

這是我們內建的 suspend 函式第三篇,讓我們看看有哪些吧:


joinAll()

還記得 join() 的功能是什麼嗎?join() 可以讓目前的 coroutine 暫停直到它完成。簡單的示範如下:

fun main() = runBlocking {
    val job = launch {
        delay(100)
        println("inner")
    }
    job.join()
    println("outer")
}

→ 呼叫子 coroutine 的 join() 讓原本應該要先列印 outer 在列印 inner 的動作,改成子 coroutine 的內容先執行,再執行下面的 println("outer")

所以 join() 的意思就是讓該 job 能夠加入至目前的執行順序中。

joinAll() 則是可以同時執行多個 job 的 join() ,也就是說等效於 jobs.forEach{ it.join()}

suspend fun joinAll(vararg jobs: Job)
suspend fun Collection<Job>.joinAll()

→ joinAll() 有兩種格式,一種是使用 vararg ,另一種是 Collection 的。

fun main() = runBlocking {
    val jobs = mutableListOf<Job>()
    repeat(10) {
        jobs.add(launch {
            delay(100)
            println("inner: $it: ${Thread.currentThread().name}")
        })
    }
    jobs.joinAll()
    println("outer: ${Thread.currentThread().name}")
}
inner: 0: main
inner: 1: main
inner: 2: main
inner: 3: main
inner: 4: main
inner: 5: main
inner: 6: main
inner: 7: main
inner: 8: main
inner: 9: main
outer: main

→ 如我們所預期的,會先執行全部的子 coroutine ,完成之後才會做執行外層的。

那麼這個就有衍生一個問題,如果其中一個 Job 發生例外,或被取消了呢?

Job 被取消

我們把上面的範例改成:

fun main() = runBlocking {
    val jobs = mutableListOf<Job>()
    repeat(10) {
        jobs.add(launch {
            withTimeout(500) {
                delay(100L * it)
                println("inner: $it: ${Thread.currentThread().name}")
            }
        })
    }
    jobs.joinAll()
    println("outer: ${Thread.currentThread().name}")
}

子 coroutine 每次呼叫 delay() 的時間會隨著次數增加,我們使用 withTimeout(500) 將這段程式碼包起來,也就是說當超過 500 毫秒的 job 都會被取消。

所以我們可以試想一下上面這個範例會是怎麼的結果,如果每次延遲 100毫秒 * it,那麼會在 it = 6 的時候開始被取消。

inner: 0: main
inner: 1: main
inner: 2: main
inner: 3: main
inner: 4: main
inner: 5: main
outer: main

Job 發生例外

我們將上面這段稍作修改,

class Day14 {
    private val scope = CoroutineScope(Job() + CoroutineExceptionHandler { _, e -> println(e) })
    suspend fun tryException() {
        val jobs = mutableListOf<Job>()
        repeat(10) {
            jobs.add(scope.launch {
                delay(100L)
                yield()
                println("inner: $it: ${Thread.currentThread().name}")
                if (it == 2) {
                    throw RuntimeException("Incorrect")
                }
            })
        }
        jobs.joinAll()
        println("outer: ${Thread.currentThread().name}")
    }
}

→ 當 it = 2 的時候,就會拋出一個 RuntimeException

在 coroutine 中,我們可以使用 CoroutineExceptionHandler 攔截例外。

inner: 5: DefaultDispatcher-worker-3
inner: 6: DefaultDispatcher-worker-6
inner: 4: DefaultDispatcher-worker-7
inner: 7: DefaultDispatcher-worker-5
inner: 0: DefaultDispatcher-worker-8
inner: 3: DefaultDispatcher-worker-2
inner: 2: DefaultDispatcher-worker-4
inner: 1: DefaultDispatcher-worker-1
inner: 8: DefaultDispatcher-worker-6
inner: 9: DefaultDispatcher-worker-7
java.lang.RuntimeException: Incorrect
outer: main

awaitAll()

無獨有偶,coroutine 也有針對所有的 Deferred 一起呼叫的 awaitAll()

public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>
suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>

→ awaitAll() 有兩種格式,一種是使用 vararg ,另一種是 Collection 的。

awaitAll() 會輸出一個 List,其中 T 是 Deferred 的回傳值。

下面的範例,將兩個 async 利用 awaitAll() 計算。

fun main() = runBlocking{
	val deferred = mutableListOf(
	        async {
	            delay(100)
	            println("inner: async1")
	            1
	        },
	        async {
	            delay(150)
	            println("inner: async2")
	            2
	        }
	    )
	    val result = deferred.awaitAll().sum()
	    println("outer: $result")
}

→ 由於 awaitAll() 回傳的是 List,所以如果想要把所有 async 裏面的值合成一個在輸出,就要看你要怎麼合成。如上例,我使用 sum() 來將 awaitAll() 輸出的所有整數加總起來。所以最後的結果是:

inner: async1
inner: async2
outer: 3

例外處理

如同 await() ,我們一樣是使用 try-catch 做例外處理。

class Day14 {
    private val scope = CoroutineScope(Job()) 

    suspend fun tryAsyncException(): Int {
        val deferred = mutableListOf(
            scope.async {
                delay(100)
                println("inner: async1")
                1
            },
            scope.async {
                delay(150)
                println("inner: async2")
                throw RuntimeException("Incorrect")
                2
            }
        )
        return deferred.awaitAll().sum()
    }
}

fun main() = runBlocking{
	val day14 = Day14()

	try {
        day14.tryAsyncException()
    } catch (e: RuntimeException) {
        println(e)
    }
    println("outer")
}
inner: async1
inner: async2
java.lang.RuntimeException: Incorrect
outer

cancelAndJoin()

還記得上一篇的範例嗎?

fun main() = runBlocking {
    val job = launch {
        println("inside: ${Thread.currentThread().name}")
        withContext(Dispatchers.Default) {
            delay(200)
            println("Default: ${Thread.currentThread().name}")
        }
    }
    delay(100)
    job.cancel()
    job.join()
    println("outer: ${Thread.currentThread().name}")
}

→在這邊我們使用了 job.cancel() 接著是 job.join()

coroutine 提供了一個更簡單 suspend 函式,那就是

cancelAndJoin()

public suspend fun Job.cancelAndJoin() {
    cancel()
    return join()
}

所以我們可以將上面的範例改成:

fun main() = runBlocking {
    val job = launch {
        println("inside: ${Thread.currentThread().name}")
        withContext(Dispatchers.Default) {
            delay(200)
            println("Default: ${Thread.currentThread().name}")
        }
    }
    delay(100)
    job.cancelAndJoin()
    println("outer: ${Thread.currentThread().name}")
}
inside: main
outer: main

小結

coroutine 不愧是 Kotlin 團隊開發的,為了讓我們的呼叫更精簡,有了很多複合的函式,了解這些函式可以讓我們的程式更簡潔一些。

參考資料

joinAll

awaitAll

cancelAndJoin

特別感謝

Kotlin Taiwan User Group

Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day13:內建的 suspend 函式,好函式不用嗎? (2)
下一篇
Day15:Channel 的第一堂課
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言