Skip to content

Instantly share code, notes, and snippets.

@jgreenyer
Last active February 22, 2019 10:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgreenyer/ad3a5ec6c514b7e8bc1fbfba92347da5 to your computer and use it in GitHub Desktop.
Save jgreenyer/ad3a5ec6c514b7e8bc1fbfba92347da5 to your computer and use it in GitHub Desktop.
Scenario-based programming in Kotlin using coroutines and channels -- minimal example
package scenarioprogramming
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
open class Event(val name: String) {}
sealed class AbstractSyncMessage()
object TerminationMessage : AbstractSyncMessage()
class SyncMessage(
val senderScenario: Scenario,
val requestedEvents: Collection<Event>,
val waitedForEvents: Collection<Event>
) : AbstractSyncMessage()
class ScenarioProgram(val scenarios: Collection<suspend Scenario.() -> Unit>) {
private val syncChannel = Channel<AbstractSyncMessage>()
private var numberOfScenariosAwaitedToReachSyncPoint = scenarios.size
private val scenariosToRequestedEvents = mutableMapOf<Scenario, Collection<Event>>()
private val scenariosToRequestedOrWaitedForEvents = mutableMapOf<Scenario, Collection<Event>>()
suspend fun run() {
launchScenarios()
while (true) {
receiveSyncMessagesFromAllRunningScenarios()
val selectedEvent = selectRequestedEvent(scenariosToRequestedEvents.values.flatten()) ?: break
notifyScenarios(selectedEvent)
}
}
private fun launchScenarios() = scenarios.forEach {
GlobalScope.launch {
val scenario = Scenario(syncChannel)
it.invoke(scenario)
scenario.terminate()
}
}
private suspend fun receiveSyncMessagesFromAllRunningScenarios() {
repeat(numberOfScenariosAwaitedToReachSyncPoint) {
val syncMessage = syncChannel.receive()
when (syncMessage) {
is SyncMessage -> {
scenariosToRequestedEvents[syncMessage.senderScenario] = syncMessage.requestedEvents
scenariosToRequestedOrWaitedForEvents[syncMessage.senderScenario] =
setOf(syncMessage.requestedEvents, syncMessage.waitedForEvents).flatten()
}
is TerminationMessage -> {}
}
}
numberOfScenariosAwaitedToReachSyncPoint = 0
}
private fun selectRequestedEvent(requestedEvents: Collection<Event>) = when (requestedEvents.isEmpty()) {
true -> null
false -> requestedEvents.first()
}
private suspend fun notifyScenarios(event: Event) {
val mutableIterator = scenariosToRequestedOrWaitedForEvents.iterator()
for (e in mutableIterator) {
if (e.value.contains(event)) {
e.key.notifyOfEventChannel.send(event)
mutableIterator.remove()
scenariosToRequestedEvents.remove(e.key)
numberOfScenariosAwaitedToReachSyncPoint++
}
}
}
}
class Scenario(val syncChannel: Channel<AbstractSyncMessage>) {
val notifyOfEventChannel: Channel<Event> = Channel()
suspend fun terminate() = syncChannel.send(TerminationMessage)
suspend fun sync(requestedEvents: Collection<Event>, waitedForEvents: Collection<Event>): Event {
syncChannel.send(SyncMessage(this, requestedEvents, waitedForEvents))
return notifyOfEventChannel.receive()
}
}
object A : Event("A")
object B : Event("B")
fun main() {
val requestTwoAs: suspend Scenario.() -> Unit = {
sync(setOf(A), setOf())
sync(setOf(A), setOf())
}
val requestTwoBs: suspend Scenario.() -> Unit = {
sync(setOf(B), setOf())
sync(setOf(B), setOf())
}
val waitForABAB: suspend Scenario.() -> Unit = {
sync(setOf(), setOf(A))
sync(setOf(), setOf(B))
sync(setOf(), setOf(A))
sync(setOf(), setOf(B))
println("success")
}
runBlocking {
launch {
ScenarioProgram(setOf(requestTwoAs, requestTwoBs, waitForABAB)).run()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment