import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val channel1 = Channel<Int>()
val channel2 = Channel<Int>()
launch {
for (i in 1..50) {
delay(100)
channel1.send(i)
}
channel1.close()
}
launch {
for (i in 6..10) {
delay(120)
channel2.send(i)
}
channel2.close()
}
// use channelFlow combines two channel's data stream
val combinedFlow = channelFlow {
launch {
for (value in channel1) {
send(value)
}
}
launch {
for (value in channel2) {
send(value)
}
}
}
combinedFlow.collect { value ->
println("Collected data: $value")
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val eventsChannel = Channel<String>()
launch {
repeat(10) {
delay(300)
eventsChannel.send("Event $it")
}
eventsChannel.close() // must Close the channel when done
}
val eventsFlow = channelFlow<String> {
for (event in eventsChannel) {
send("Processed: $event")
}
}
eventsFlow.collect { processedEvent ->
println("collected: " + processedEvent)
}
}
使用 channelFlow 時, 在 suspend lambda中有是當的取消處理,這是要避免 leak 或是意外的行為.
恩, 有點趕