Last active
March 12, 2018 16:03
-
-
Save ZakTaccardi/af72de646b10bf9d132a078cf4739cbe to your computer and use it in GitHub Desktop.
combineLatest in coroutines adventure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
suspend fun <A : Any?, B : Any?, R> ReceiveChannel<A>.combineLatest( | |
otherSource: ReceiveChannel<B>, | |
context: CoroutineContext = Unconfined, | |
combineFunction: suspend (A, B) -> R | |
): ReceiveChannel<R> = produce(context) { | |
val sourceA: ReceiveChannel<A> = this@combineLatest | |
val sourceB: ReceiveChannel<B> = otherSource | |
val latestA = AtomicReference<A>() | |
val latestB = AtomicReference<B>() | |
var aInitialized = false | |
var bInitialized = false | |
val mutex = Mutex() | |
sourceA.consumeEach { a -> | |
mutex.withLock { | |
latestA.set(a) | |
aInitialized = true | |
if (aInitialized && bInitialized) { | |
send(combineFunction(latestA.get(), latestB.get())) | |
} | |
} | |
launch(coroutineContext) { | |
sourceB.consumeEach { b -> | |
mutex.withLock { | |
latestB.set(b) | |
bInitialized = true | |
if (aInitialized && bInitialized) { | |
send(combineFunction(latestA.get(), latestB.get())) | |
} | |
} | |
} | |
} | |
} | |
} | |
// combine latest that doesn't seem to work well | |
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest2( | |
otherSource: ReceiveChannel<B>, | |
context: CoroutineContext = Unconfined, | |
combineFunction: suspend (A, B) -> R | |
): ReceiveChannel<R> = produce(context) { | |
val sourceA: ReceiveChannel<A> = this@combineLatest2 | |
val sourceB: ReceiveChannel<B> = otherSource | |
var latestA: A? = null | |
var latestB: B? = null | |
whileSelect { | |
sourceA.onReceive { a -> | |
latestA = a | |
if (latestA != null && latestB != null) { | |
send(combineFunction(latestA!!, latestB!!)) | |
} | |
true | |
} | |
sourceB.onReceive { b -> | |
latestB = b | |
if (latestA != null && latestB != null) { | |
send(combineFunction(latestA!!, latestB!!)) | |
} | |
true | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
fun operator_combineLatest() { | |
fun runTest(context: CoroutineContext) { | |
println("Running test for $context") | |
val mockObserver = mock<(Pair<String, Int>) -> Unit> { } | |
val logger: (Pair<String, Int>) -> Unit = { println(it) } | |
val sourceNames = ArrayBroadcastChannel<String>(1000) | |
val sourceAges = ArrayBroadcastChannel<Int>(1000) | |
val sourceAChannel = sourceNames.openSubscription() | |
val sourceBChannel = sourceAges.openSubscription() | |
launch(context) { | |
sourceAChannel | |
.combineLatest(sourceBChannel) { name, age -> Pair(name, age) } | |
.consumeEach { | |
logger(it) | |
mockObserver(it) | |
} | |
} | |
val job = launch(CommonPool) { | |
sourceNames.send("Zak") | |
sourceNames.send("Grace") | |
sourceAges.send(24) | |
sourceNames.send("Kelly") | |
sourceAges.send(25) | |
sourceNames.send("Jack") | |
sourceAges.send(27) | |
sourceAges.send(28) // happy birthday | |
} | |
runBlocking { | |
job.join() | |
} | |
val inOrder = InOrderOnType(mockObserver) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Grace", 24)) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Kelly", 24)) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Kelly", 25)) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 25)) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 27)) | |
inOrder.verify(mockObserver, times(1)).invoke(Pair("Jack", 28)) | |
verifyNoMoreInteractions(mockObserver) | |
println("Test passed for $context") | |
} | |
runTest(Unconfined) | |
runTest(newSingleThreadContext("SingleThreadContext")) | |
runTest(CommonPool) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment