Skip to content

Instantly share code, notes, and snippets.

@taer
Last active October 4, 2019 19:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taer/9866ebc832610d941f4e5e54123abf82 to your computer and use it in GitHub Desktop.
Save taer/9866ebc832610d941f4e5e54123abf82 to your computer and use it in GitHub Desktop.
package channel
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
fun main() {
runBlocking {
val message = makeFlowFlow()
.distinctUntilChanged()
.take(10)
.toList()
println(message)
}
}
fun makeFlowFlow(): Flow<Map<String, Set<String>>> = channelFlow {
val currentState = ConcurrentHashMap<String, Set<String>>()
val jobTrackingMap = ConcurrentHashMap<String, Job>()
makeCatalog()
.distinctUntilChanged()
.onEach { newServices ->
val currentServices = currentState.keys
val toStart = newServices - currentServices
val toStop = currentServices - newServices
println("Starting $toStart. Killing $toStop")
toStart.forEach { serviceName ->
val launchIn = makeServices(serviceName)
.onStart { println("starting $serviceName") }
.onEach { newInstances ->
currentState[serviceName] = newInstances
println("Emitting $serviceName update of $currentState")
send(currentState.toMap())
}.launchIn(scope = this)
jobTrackingMap[serviceName] = launchIn
println("Spawned a job for $serviceName")
}
toStop.forEach { serviceName ->
println("Stopping $serviceName")
jobTrackingMap[serviceName]?.cancel()
currentState.remove(serviceName)
}
}.launchIn(this)
}
fun makeCatalog(): Flow<Set<String>> = channelFlow {
val catalog = listOf("a", "b", "c", "d")
val catalog2 = listOf("b", "c", "d", "e")
while (true) {
val fakeFutureFromclient = CompletableFuture.supplyAsync {
Thread.sleep(50)
setOf(catalog.random(), catalog2.random()).also {
println("Current catalog is $it")
}
}
send(fakeFutureFromclient.await())
}
}
fun makeServices(serviceName: String): Flow<Set<String>> = channelFlow {
val serviceList = List(5) {
"$serviceName-$it"
}
while (true) {
val fakeFutureFromclient = CompletableFuture.supplyAsync {
Thread.sleep(30)
setOf(serviceList.random(), serviceList.random()).also {
println("current state of $serviceName is $it")
}
}
val element = fakeFutureFromclient.await()
println("doing the emmission for $serviceName")
send(element)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment