Skip to content

Instantly share code, notes, and snippets.

@macalinao
Created May 15, 2017 03:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save macalinao/b6d187c084e992ab8d5f8d4adab25ec5 to your computer and use it in GitHub Desktop.
Save macalinao/b6d187c084e992ab8d5f8d4adab25ec5 to your computer and use it in GitHub Desktop.
package io.asuna.cantor.model
import monix.reactive.Observable
import monix.eval.Task
import cats._, implicits._
import monix.cats._
/**
* Something you can read from.
*/
trait Source[K, V] {
/**
* Reads a single item, returning `None` if it does not exist.
*/
def read(in: K): Task[Option[V]]
/**
* Reads multiple items. The default implementation assumes you are
* using a store where it is optimal to parallelize reads.
* You may want to override this if you want to read items in batch.
*/
def readMany(in: List[K], parallelism: Int = 10): Observable[(K, V)] = {
Observable.fromIterable(in).mapAsync(parallelism) {k =>
read(k).map(v => v.map(value => (k, value)))
}
.collect { case Some(x) => x }
}
def readMap(in: List[K]): Task[Map[K, V]] =
readMany(in).foldLeftL(Map[K, V]())(_ + _)
}
object Source extends SourceInstances
private[model] sealed trait SourceInstances {
/**
* `Apply` instance for `Source`. Allows us to take products of sources.
*/
implicit def cartesianInstance[K, V]: Functor[Source[K, ?]] with Cartesian[Source[K, ?]] = new Functor[Source[K, ?]] with Cartesian[Source[K, ?]] {
def product[A, B](fa: Source[K, A], fb: Source[K, B]): Source[K, (A, B)] = new Source[K, (A, B)] {
override def read(in: K): Task[Option[(A, B)]] = {
fa.read(in).product(fb.read(in)).map { case (a, b) =>
a.product(b)
}
}
// We do not override `readMany` here as we do not know anything about the underlying source.
}
def map[A, B](fa: Source[K,A])(f: A => B): Source[K, B] = new Source[K, B] {
override def read(in: K): Task[Option[B]] = {
fa.read(in).map(_.map(f))
}
/**
* This is overridden to delegate to a possibly optimal underlying readMany.
*/
override def readMany(in: List[K], parallelism: Int = 10): Observable[(K, B)] = {
fa.readMany(in, parallelism).map { case (k, v) =>
(k, f(v))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment