Last active
February 22, 2019 10:31
-
-
Save jgreenyer/ad3a5ec6c514b7e8bc1fbfba92347da5 to your computer and use it in GitHub Desktop.
Scenario-based programming in Kotlin using coroutines and channels -- minimal example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scenarioprogramming | |
import kotlinx.coroutines.GlobalScope | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.launch | |
import kotlinx.coroutines.runBlocking | |
open class Event(val name: String) {} | |
sealed class AbstractSyncMessage() | |
object TerminationMessage : AbstractSyncMessage() | |
class SyncMessage( | |
val senderScenario: Scenario, | |
val requestedEvents: Collection<Event>, | |
val waitedForEvents: Collection<Event> | |
) : AbstractSyncMessage() | |
class ScenarioProgram(val scenarios: Collection<suspend Scenario.() -> Unit>) { | |
private val syncChannel = Channel<AbstractSyncMessage>() | |
private var numberOfScenariosAwaitedToReachSyncPoint = scenarios.size | |
private val scenariosToRequestedEvents = mutableMapOf<Scenario, Collection<Event>>() | |
private val scenariosToRequestedOrWaitedForEvents = mutableMapOf<Scenario, Collection<Event>>() | |
suspend fun run() { | |
launchScenarios() | |
while (true) { | |
receiveSyncMessagesFromAllRunningScenarios() | |
val selectedEvent = selectRequestedEvent(scenariosToRequestedEvents.values.flatten()) ?: break | |
notifyScenarios(selectedEvent) | |
} | |
} | |
private fun launchScenarios() = scenarios.forEach { | |
GlobalScope.launch { | |
val scenario = Scenario(syncChannel) | |
it.invoke(scenario) | |
scenario.terminate() | |
} | |
} | |
private suspend fun receiveSyncMessagesFromAllRunningScenarios() { | |
repeat(numberOfScenariosAwaitedToReachSyncPoint) { | |
val syncMessage = syncChannel.receive() | |
when (syncMessage) { | |
is SyncMessage -> { | |
scenariosToRequestedEvents[syncMessage.senderScenario] = syncMessage.requestedEvents | |
scenariosToRequestedOrWaitedForEvents[syncMessage.senderScenario] = | |
setOf(syncMessage.requestedEvents, syncMessage.waitedForEvents).flatten() | |
} | |
is TerminationMessage -> {} | |
} | |
} | |
numberOfScenariosAwaitedToReachSyncPoint = 0 | |
} | |
private fun selectRequestedEvent(requestedEvents: Collection<Event>) = when (requestedEvents.isEmpty()) { | |
true -> null | |
false -> requestedEvents.first() | |
} | |
private suspend fun notifyScenarios(event: Event) { | |
val mutableIterator = scenariosToRequestedOrWaitedForEvents.iterator() | |
for (e in mutableIterator) { | |
if (e.value.contains(event)) { | |
e.key.notifyOfEventChannel.send(event) | |
mutableIterator.remove() | |
scenariosToRequestedEvents.remove(e.key) | |
numberOfScenariosAwaitedToReachSyncPoint++ | |
} | |
} | |
} | |
} | |
class Scenario(val syncChannel: Channel<AbstractSyncMessage>) { | |
val notifyOfEventChannel: Channel<Event> = Channel() | |
suspend fun terminate() = syncChannel.send(TerminationMessage) | |
suspend fun sync(requestedEvents: Collection<Event>, waitedForEvents: Collection<Event>): Event { | |
syncChannel.send(SyncMessage(this, requestedEvents, waitedForEvents)) | |
return notifyOfEventChannel.receive() | |
} | |
} | |
object A : Event("A") | |
object B : Event("B") | |
fun main() { | |
val requestTwoAs: suspend Scenario.() -> Unit = { | |
sync(setOf(A), setOf()) | |
sync(setOf(A), setOf()) | |
} | |
val requestTwoBs: suspend Scenario.() -> Unit = { | |
sync(setOf(B), setOf()) | |
sync(setOf(B), setOf()) | |
} | |
val waitForABAB: suspend Scenario.() -> Unit = { | |
sync(setOf(), setOf(A)) | |
sync(setOf(), setOf(B)) | |
sync(setOf(), setOf(A)) | |
sync(setOf(), setOf(B)) | |
println("success") | |
} | |
runBlocking { | |
launch { | |
ScenarioProgram(setOf(requestTwoAs, requestTwoBs, waitForABAB)).run() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment