Skip to content

Instantly share code, notes, and snippets.

@vicety
Last active March 13, 2023 11:30
Show Gist options
  • Save vicety/8a8a69a9988339929dec4f25be1cfb15 to your computer and use it in GitHub Desktop.
Save vicety/8a8a69a9988339929dec4f25be1cfb15 to your computer and use it in GitHub Desktop.
package portals.examples
import java.util.concurrent.LinkedBlockingQueue
import scala.annotation.experimental
import portals.api.builder.ApplicationBuilder
import portals.api.builder.TaskBuilder
import portals.application.task.PerKeyState
import portals.application.task.PerTaskState
import portals.application.task.StatefulTaskContext
import portals.application.ASTPrinter
import portals.application.Application
import portals.application.AtomicPortalRef
import portals.system.Systems
import portals.util.Future
object Events:
//////////////////////////////////////////////////////////////////////////////
// Query
//////////////////////////////////////////////////////////////////////////////
case class Query(workflow: String, sql: String)
case class QueryResult(success: Boolean, message: String, rows: List[Tuple])
def keyFrom(query: Query): Long = query.workflow.hashCode()
//////////////////////////////////////////////////////////////////////////////
// Events
//////////////////////////////////////////////////////////////////////////////
sealed trait Event
object Tasks:
import portals.api.dsl.DSL.*
import portals.api.dsl.ExperimentalDSL.*
import Events.*
//////////////////////////////////////////////////////////////////////////////
// AccountTask
//////////////////////////////////////////////////////////////////////////////
val replierState: StatefulTaskContext ?=> PerTaskState[Int] = PerTaskState("bankAccountState", 1)
def accountTask(portals: AtomicPortalRef[Query, QueryResult]) =
TaskBuilder.replier[Nothing, Nothing, Query, QueryResult](portals)(_ => ()) { query =>
ctx.log.info("bank-account-replier: " + query.toString())
replierState.set(replierState.get() + 1)
reply(QueryResult(true, replierState.get().toString, List()))
}
//////////////////////////////////////////////////////////////////////////////
// QueryTask
//////////////////////////////////////////////////////////////////////////////
def queryTask(portals: AtomicPortalRef[Query, QueryResult]) =
TaskBuilder.asker[Query, Nothing, Query, QueryResult](portals) { query =>
ctx.log.info("queryTask: " + query.toString())
val future: Future[QueryResult] = ask(portals)(query)
future.await {
ctx.log.info("queryTask got reply: " + future.value.get)
}
}
@experimental
object Queryable:
import portals.api.builder.TaskBuilder
import portals.api.dsl.DSL.*
import portals.api.dsl.ExperimentalDSL.*
import Events.*
val app: Application = PortalsApp("any") {
////////////////////////////////////////////////////////////////////////////
// Bank Account
////////////////////////////////////////////////////////////////////////////
val bankAccountPortal = Portal[Query, QueryResult]("bankAccountPortal", keyFrom)
val generator = Generators.fromList(List[Event]())
val wf = Workflows[Nothing, Nothing]("bankAccount")
.source(Generators.empty.stream)
.task[Nothing](Tasks.accountTask(bankAccountPortal))
.withName("bankAccountReplier")
.sink()
.freeze()
////////////////////////////////////////////////////////////////////////////
// Query Origin
////////////////////////////////////////////////////////////////////////////
val queries = Generators.fromIterator[Query](REPL.iterator)
val queryWorkflow = Workflows[Query, Nothing]("queryWorkflow")
.source(queries.stream)
.task(Tasks.queryTask(bankAccountPortal))
.sink()
.freeze()
}
object REPL:
import Events.*
class BlockingQueueIterator[T] extends Iterator[T] {
val queue = new LinkedBlockingQueue[T]()
override def hasNext: Boolean = !queue.isEmpty
override def next(): T = {
queue.take()
}
def enqueue(e: T) = queue.offer(e)
}
val iterator = new BlockingQueueIterator[Query]()
val waitForMsg = new LinkedBlockingQueue[Int]()
val waitForProcessing = new LinkedBlockingQueue[Int]()
def start(): Unit = {
new Thread {
override def run(): Unit = {
while (true) {
val line = scala.io.StdIn.readLine("prompt> ")
iterator.enqueue(Query("test-wf", line))
waitForMsg.offer(0)
waitForProcessing.take()
}
}
}.start()
}
@experimental
object QueryableMain extends App:
val system = Systems.interpreter()
system.launch(Queryable.app)
REPL.start()
new Thread() {
override def run(): Unit = {
while (true) {
REPL.waitForMsg.take()
system.stepUntilComplete()
REPL.waitForProcessing.offer(0)
}
}
}.start()
system.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment