Skip to content

Instantly share code, notes, and snippets.

@iravid
Created September 30, 2017 18:27
Show Gist options
  • Save iravid/6a4d8db9d9c361ed500d558e64388f5f to your computer and use it in GitHub Desktop.
Save iravid/6a4d8db9d9c361ed500d558e64388f5f to your computer and use it in GitHub Desktop.
// Current OOP-style class
class SearchService(implicit ec: ExecutionContext) {
// these things need to be shutdown
val db = new GraphService
val index = new IndexService
def searchClasses(query: String): Future[List[FqnSymbol]] =
// do things with both db and index
def shutdown: Future[Unit] =
db.shutdown >> index.shutdown
}
object StreamService {
sealed trait Request
object Request {
case class SearchClasses(query: String) extends Request
}
case class ServiceInterface(queue: fs2.async.mutable.Queue, signal: fs2.async.mutable.Signal, completion: Task[Unit]) {
def searchClasses(query: String): Task[List[FqnSymbol]] =
// enqueue SearchClasses(query) into the queue and somehow get the result back as a task
}
def handleRequest(req: Request): Task[Unit] = req match {
// pattern match each request and somehow get the result back to the caller
}
case class Dependencies(index: IndexService, db: GraphService)
def run(deps: Dependencies): Task[ServiceInterface] =
for {
inputQueue <- fs2.async.unboundedQueue[Task, Request]
shutdownSignal <- fs2.async.signalOf[Task, Boolean](false)
completion <- Task.start {
inputQueue.dequeue
.evalMap(handleRequest)
.interruptWhen(shutdownSignal)
.run
}
} yield ServiceInterface(inputQueue, shutdownSignal, completion)
def apply(): Task[ServiceInterface] = // do the bracketing here - setup the deps, Stream.eval(run)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment