Skip to content

Instantly share code, notes, and snippets.

@matfournier
Last active November 11, 2019 19:33
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save matfournier/7e76330ae7970447cc249b46c2dca23e to your computer and use it in GitHub Desktop.
Monix Observable Help
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent._
import scala.concurrent.duration._
// possibly useful
// https://github.com/monix/monix/issues/481
// https://stackoverflow.com/questions/45894205/getting-cassandra-query-results-asynchronously-using-scala-monix
// though one of the linked issues says flatMap is not stacksafe, need to test this... if it's not that's a problem!
class Go(pf: ParentFetch, cf: ChildFetch) {
def go: Task[List[Page]] = {
val observable = Observable
.fromAsyncStateAction[Task[PageResult[Page]], PageResult[Page]] { nextPage =>
nextPage.flatMap(onNextPage)
}(pf.get(1))
.takeWhileInclusive { page =>
page.paging match {
case Next(_) => true
case _ => false
}
}
.flatMap { pr =>
{
// this seems expensive, if I have n pages I'm creating
// n observables. Surely this is wrong?
//
// design 2, if I wanted to say, batch fetch the children (if the api allowed)
// how would I buffer the parent stream into chunks of 10 to make a call to get
// 10 children through some batch api? I would still suffer from the same
// problem of many, many observables.
//
// in this version, none of the children have additioanl pages so I just use
// from task rather than fromAsyncTask like the parent, in reality it's another
// sequence of 1 or more calls.
Observable
.fromIterable(pr.data)
.flatMap(
p =>
p match {
case pp: ParentPage => pp.child.fold(Observable.eval(p))(child =>
Observable.fromTask(cf.get(child)) ++ Observable.eval(p))
case cp: ChildPage => Observable.eval(cp)
}
)
}
}
val consumer: Consumer[Page, List[Page]] =
Consumer.foldLeft(List[Page]())((acc, page) => {
page +: acc
})
observable.take(300).consumeWith(consumer)
}
def onNextPage(pageResult: PageResult[Page]): Task[(PageResult[Page], Task[PageResult[Page]])] =
pageResult.paging match {
case Next(i) => Task((pageResult, pf.get(i)))
case Finished => Task((pageResult, Task.now(pageResult)))
}
}
object GoApp extends App {
val pf = new ParentFetch()
val cf = new ChildFetch()
val go = new Go(pf, cf)
val result = go.go.runToFuture
val r = Await.result(result, Duration.Inf)
println("\n **** \n")
println(r)
}
sealed trait Page
final case class ParentPage(id: String, child: Option[String]) extends Page
final case class ChildPage(id: String, parent: String) extends Page
final case class PageResult[T](data: List[T], paging: Cursor)
sealed trait Cursor
final case class Next(i: Int) extends Cursor
case object Finished extends Cursor
import monix.eval.Task
class ParentFetch {
type Result = PageResult[Page]
val p1 = ParentPage("p1", None)
val p2 = ParentPage("p2", None)
val p3 = ParentPage("p3", Some("c1"))
val p4 = ParentPage("p4", Some("c2"))
val p5 = ParentPage("p5", Some("c3"))
val p6 = ParentPage("p6", None)
// two sets of pages
// in reality there can be thousands
val parentMap: Map[Int, PageResult[Page]] = Map(
1 -> PageResult(List(p1, p2, p3, p4), Next(2)),
2 -> PageResult(List(p5, p6), Finished)
)
def get(i: Int): Task[Result] =
parentMap.get(i).fold(Task.raiseError[Result](new Exception("no more parents")))(v => Task.now(v))
}
class ChildFetch {
val c1 = ChildPage("c1", "p3")
val c2 = ChildPage("c2", "p4")
val c3 = ChildPage("c3", "p5")
// in reality the children are also paged, simplified it here
// but a parent can have thousands of children in prod
val childMap: Map[String, ChildPage] = Map(
"c1" -> c1,
"c2" -> c2,
"c3" -> c3
)
def get(s: String): Task[Page] =
childMap.get(s).fold(Task.raiseError[Page](new Exception("boop")))(c => Task.now(c))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment