Last active
March 13, 2023 11:30
-
-
Save vicety/8a8a69a9988339929dec4f25be1cfb15 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package portals.examples | |
import java.util.concurrent.LinkedBlockingQueue | |
import scala.annotation.experimental | |
import portals.api.builder.ApplicationBuilder | |
import portals.api.builder.TaskBuilder | |
import portals.application.task.PerKeyState | |
import portals.application.task.PerTaskState | |
import portals.application.task.StatefulTaskContext | |
import portals.application.ASTPrinter | |
import portals.application.Application | |
import portals.application.AtomicPortalRef | |
import portals.system.Systems | |
import portals.util.Future | |
object Events: | |
////////////////////////////////////////////////////////////////////////////// | |
// Query | |
////////////////////////////////////////////////////////////////////////////// | |
case class Query(workflow: String, sql: String) | |
case class QueryResult(success: Boolean, message: String, rows: List[Tuple]) | |
def keyFrom(query: Query): Long = query.workflow.hashCode() | |
////////////////////////////////////////////////////////////////////////////// | |
// Events | |
////////////////////////////////////////////////////////////////////////////// | |
sealed trait Event | |
object Tasks: | |
import portals.api.dsl.DSL.* | |
import portals.api.dsl.ExperimentalDSL.* | |
import Events.* | |
////////////////////////////////////////////////////////////////////////////// | |
// AccountTask | |
////////////////////////////////////////////////////////////////////////////// | |
val replierState: StatefulTaskContext ?=> PerTaskState[Int] = PerTaskState("bankAccountState", 1) | |
def accountTask(portals: AtomicPortalRef[Query, QueryResult]) = | |
TaskBuilder.replier[Nothing, Nothing, Query, QueryResult](portals)(_ => ()) { query => | |
ctx.log.info("bank-account-replier: " + query.toString()) | |
replierState.set(replierState.get() + 1) | |
reply(QueryResult(true, replierState.get().toString, List())) | |
} | |
////////////////////////////////////////////////////////////////////////////// | |
// QueryTask | |
////////////////////////////////////////////////////////////////////////////// | |
def queryTask(portals: AtomicPortalRef[Query, QueryResult]) = | |
TaskBuilder.asker[Query, Nothing, Query, QueryResult](portals) { query => | |
ctx.log.info("queryTask: " + query.toString()) | |
val future: Future[QueryResult] = ask(portals)(query) | |
future.await { | |
ctx.log.info("queryTask got reply: " + future.value.get) | |
} | |
} | |
@experimental | |
object Queryable: | |
import portals.api.builder.TaskBuilder | |
import portals.api.dsl.DSL.* | |
import portals.api.dsl.ExperimentalDSL.* | |
import Events.* | |
val app: Application = PortalsApp("any") { | |
//////////////////////////////////////////////////////////////////////////// | |
// Bank Account | |
//////////////////////////////////////////////////////////////////////////// | |
val bankAccountPortal = Portal[Query, QueryResult]("bankAccountPortal", keyFrom) | |
val generator = Generators.fromList(List[Event]()) | |
val wf = Workflows[Nothing, Nothing]("bankAccount") | |
.source(Generators.empty.stream) | |
.task[Nothing](Tasks.accountTask(bankAccountPortal)) | |
.withName("bankAccountReplier") | |
.sink() | |
.freeze() | |
//////////////////////////////////////////////////////////////////////////// | |
// Query Origin | |
//////////////////////////////////////////////////////////////////////////// | |
val queries = Generators.fromIterator[Query](REPL.iterator) | |
val queryWorkflow = Workflows[Query, Nothing]("queryWorkflow") | |
.source(queries.stream) | |
.task(Tasks.queryTask(bankAccountPortal)) | |
.sink() | |
.freeze() | |
} | |
object REPL: | |
import Events.* | |
class BlockingQueueIterator[T] extends Iterator[T] { | |
val queue = new LinkedBlockingQueue[T]() | |
override def hasNext: Boolean = !queue.isEmpty | |
override def next(): T = { | |
queue.take() | |
} | |
def enqueue(e: T) = queue.offer(e) | |
} | |
val iterator = new BlockingQueueIterator[Query]() | |
val waitForMsg = new LinkedBlockingQueue[Int]() | |
val waitForProcessing = new LinkedBlockingQueue[Int]() | |
def start(): Unit = { | |
new Thread { | |
override def run(): Unit = { | |
while (true) { | |
val line = scala.io.StdIn.readLine("prompt> ") | |
iterator.enqueue(Query("test-wf", line)) | |
waitForMsg.offer(0) | |
waitForProcessing.take() | |
} | |
} | |
}.start() | |
} | |
@experimental | |
object QueryableMain extends App: | |
val system = Systems.interpreter() | |
system.launch(Queryable.app) | |
REPL.start() | |
new Thread() { | |
override def run(): Unit = { | |
while (true) { | |
REPL.waitForMsg.take() | |
system.stepUntilComplete() | |
REPL.waitForProcessing.offer(0) | |
} | |
} | |
}.start() | |
system.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment