Skip to content

Instantly share code, notes, and snippets.

@ZakTaccardi
Last active March 12, 2018 16:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ZakTaccardi/af72de646b10bf9d132a078cf4739cbe to your computer and use it in GitHub Desktop.
Save ZakTaccardi/af72de646b10bf9d132a078cf4739cbe to your computer and use it in GitHub Desktop.
combineLatest in coroutines adventure
suspend fun <A : Any?, B : Any?, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest
val sourceB: ReceiveChannel<B> = otherSource
val latestA = AtomicReference<A>()
val latestB = AtomicReference<B>()
var aInitialized = false
var bInitialized = false
val mutex = Mutex()
sourceA.consumeEach { a ->
mutex.withLock {
latestA.set(a)
aInitialized = true
if (aInitialized && bInitialized) {
send(combineFunction(latestA.get(), latestB.get()))
}
}
launch(coroutineContext) {
sourceB.consumeEach { b ->
mutex.withLock {
latestB.set(b)
bInitialized = true
if (aInitialized && bInitialized) {
send(combineFunction(latestA.get(), latestB.get()))
}
}
}
}
}
}
// combine latest that doesn't seem to work well
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest2(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest2
val sourceB: ReceiveChannel<B> = otherSource
var latestA: A? = null
var latestB: B? = null
whileSelect {
sourceA.onReceive { a ->
latestA = a
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
true
}
sourceB.onReceive { b ->
latestB = b
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
true
}
}
}
@Test
fun operator_combineLatest() {
fun runTest(context: CoroutineContext) {
println("Running test for $context")
val mockObserver = mock<(Pair<String, Int>) -> Unit> { }
val logger: (Pair<String, Int>) -> Unit = { println(it) }
val sourceNames = ArrayBroadcastChannel<String>(1000)
val sourceAges = ArrayBroadcastChannel<Int>(1000)
val sourceAChannel = sourceNames.openSubscription()
val sourceBChannel = sourceAges.openSubscription()
launch(context) {
sourceAChannel
.combineLatest(sourceBChannel) { name, age -> Pair(name, age) }
.consumeEach {
logger(it)
mockObserver(it)
}
}
val job = launch(CommonPool) {
sourceNames.send("Zak")
sourceNames.send("Grace")
sourceAges.send(24)
sourceNames.send("Kelly")
sourceAges.send(25)
sourceNames.send("Jack")
sourceAges.send(27)
sourceAges.send(28) // happy birthday
}
runBlocking {
job.join()
}
val inOrder = InOrderOnType(mockObserver)
inOrder.verify(mockObserver, times(1)).invoke(Pair("Grace", 24))
inOrder.verify(mockObserver, times(1)).invoke(Pair("Kelly", 24))
inOrder.verify(mockObserver, times(1)).invoke(Pair("Kelly", 25))
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 25))
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 27))
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 28))
verifyNoMoreInteractions(mockObserver)
println("Test passed for $context")
}
runTest(Unconfined)
runTest(newSingleThreadContext("SingleThreadContext"))
runTest(CommonPool)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment