Skip to content

Instantly share code, notes, and snippets.

@EricDw
Created November 21, 2018 02:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EricDw/f754ba2c4a041dffc97a2eb880751e4d to your computer and use it in GitHub Desktop.
Save EricDw/f754ba2c4a041dffc97a2eb880751e4d to your computer and use it in GitHub Desktop.
A small DSL around Kotlin's Actor Couroutine.
package com.publicmethod.domain.pipelines
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.SelectBuilder
import kotlinx.coroutines.selects.select
interface Pipeline<I, O> : SendChannel<I>, ReceiveChannel<O>
open class PipelineBuilder<I, O>(
actorScope: ActorScope<I>,
private val outputChannel: Channel<O>
) : ActorScope<I> by actorScope {
private lateinit var router: Job
fun route(f: SelectBuilder<Unit>.() -> Unit) {
router = selector(f)
}
suspend fun send(o: () -> O) {
outputChannel.send(o())
}
}
class StatefulPipelineBuilder<I, O>(
actorScope: ActorScope<I>,
initialState: O,
outputChannel: Channel<O>
) : PipelineBuilder<I, O>(
actorScope,
outputChannel
) {
var state: O = initialState
}
fun <I, O> CoroutineScope.pipeline(init: suspend PipelineBuilder<I, O>.() -> Unit): Pipeline<I, O> {
lateinit var pipelineBuilder: PipelineBuilder<I, O>
val outputChannel: Channel<O> = Channel(Channel.UNLIMITED)
val inputChannel: SendChannel<I> = this.actor(
context = this.coroutineContext,
capacity = Channel.UNLIMITED
) {
pipelineBuilder = PipelineBuilder(this, outputChannel)
pipelineBuilder.init()
}
return object : Pipeline<I, O>,
SendChannel<I> by inputChannel,
ReceiveChannel<O> by outputChannel {}
}
fun <I, O> CoroutineScope.statefulPipeline(
initialState: O,
init: suspend StatefulPipelineBuilder<I, O>.() -> Unit
): Pipeline<I, O> {
lateinit var statefulPipelineBuilder: StatefulPipelineBuilder<I, O>
val outputChannel: Channel<O> = Channel(Channel.UNLIMITED)
val inputChannel: SendChannel<I> = this.actor(
context = this.coroutineContext,
capacity = Channel.UNLIMITED
) {
statefulPipelineBuilder = StatefulPipelineBuilder(
this,
initialState,
outputChannel
)
statefulPipelineBuilder.init()
}
return object : Pipeline<I, O>,
SendChannel<I> by inputChannel,
ReceiveChannel<O> by outputChannel {}
}
fun CoroutineScope.selector(builder: SelectBuilder<Unit>.() -> Unit): Job =
this.launch(context = this.coroutineContext) {
while (this.isActive) {
select(builder = builder)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment