Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active April 4, 2020 15:12
Show Gist options
  • Save fluidsonic/f7b2b0084f184932ea3be4cf0074496a to your computer and use it in GitHub Desktop.
Save fluidsonic/f7b2b0084f184932ea3be4cf0074496a to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
@Suppress("EXPERIMENTAL_API_USAGE")
suspend fun main() {
coroutineScope {
println("NORMAL FLOW")
val emitter = Channel<Int>()
val flow = flow {
emitter.consumeEach { value ->
println("$value: before emit")
emit(value)
println("$value: after emit")
}
}
launch {
flow.collect { value ->
println("$value: processing started")
delay(2_000)
println("$value: processing completed")
}
}
delay(10)
emitter.send(1)
emitter.send(2)
emitter.close()
}
println()
coroutineScope {
println("BROADCAST CHANNEL FLOW")
val emitter = Channel<Int>()
val flow = flow {
emitter.consumeEach { value ->
println("$value: before emit")
emit(value)
println("$value: after emit")
}
}
.broadcastIn(this)
.asFlow()
launch {
flow.collect { value ->
println("$value: processing A started")
delay(2_000)
println("$value: processing A completed")
}
}
delay(10)
launch {
flow.collect { value ->
println("$value: processing B started")
delay(2_000)
println("$value: processing B completed")
}
}
delay(10)
emitter.send(1)
emitter.send(2)
emitter.close()
}
println()
coroutineScope {
println("BROADCAST FLOW")
val emitter = Channel<Int>()
val flow = flow {
emitter.consumeEach { value ->
println("$value: before emit")
emit(value)
println("$value: after emit")
}
}
.broadcast()
launch {
flow.collect { value ->
println("$value: processing A started")
delay(2_000)
println("$value: processing A completed")
}
}
delay(10)
launch {
flow.collect { value ->
println("$value: processing B started")
delay(2_000)
println("$value: processing B completed")
}
}
delay(10)
emitter.send(1)
emitter.send(2)
emitter.close()
}
}
fun <T> Flow<T>.broadcast(): Flow<T> =
BroadcastFlow(this)
@OptIn(FlowPreview::class)
class BroadcastFlow<T>(
private val flow: Flow<T>
) : AbstractFlow<T>() {
private val collectors: MutableList<Pair<FlowCollector<T>, CoroutineContext>> = mutableListOf()
private var completion: CompletableDeferred<Unit>? = null
private val mutex = Mutex()
override suspend fun collectSafely(collector: FlowCollector<T>) {
mutex.lock()
val completion = completion
collectors += collector to coroutineContext
try {
if (completion != null) {
mutex.unlock()
completion.await()
} else {
val completion = CompletableDeferred<Unit>()
this@BroadcastFlow.completion = completion
mutex.unlock()
try {
flow.collect { value ->
mutex.withLock { collectors.toList() }
.forEach { (collector, coroutineContext) ->
withContext(coroutineContext) {
collector.emit(value)
}
}
}
completion.complete(Unit)
} catch (e: Throwable) {
completion.completeExceptionally(e)
throw e
}
}
} finally {
mutex.withLock {
collectors.removeIf { it.first === collector }
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment