Created
May 15, 2017 03:38
-
-
Save macalinao/b6d187c084e992ab8d5f8d4adab25ec5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.asuna.cantor.model | |
import monix.reactive.Observable | |
import monix.eval.Task | |
import cats._, implicits._ | |
import monix.cats._ | |
/** | |
* Something you can read from. | |
*/ | |
trait Source[K, V] { | |
/** | |
* Reads a single item, returning `None` if it does not exist. | |
*/ | |
def read(in: K): Task[Option[V]] | |
/** | |
* Reads multiple items. The default implementation assumes you are | |
* using a store where it is optimal to parallelize reads. | |
* You may want to override this if you want to read items in batch. | |
*/ | |
def readMany(in: List[K], parallelism: Int = 10): Observable[(K, V)] = { | |
Observable.fromIterable(in).mapAsync(parallelism) {k => | |
read(k).map(v => v.map(value => (k, value))) | |
} | |
.collect { case Some(x) => x } | |
} | |
def readMap(in: List[K]): Task[Map[K, V]] = | |
readMany(in).foldLeftL(Map[K, V]())(_ + _) | |
} | |
object Source extends SourceInstances | |
private[model] sealed trait SourceInstances { | |
/** | |
* `Apply` instance for `Source`. Allows us to take products of sources. | |
*/ | |
implicit def cartesianInstance[K, V]: Functor[Source[K, ?]] with Cartesian[Source[K, ?]] = new Functor[Source[K, ?]] with Cartesian[Source[K, ?]] { | |
def product[A, B](fa: Source[K, A], fb: Source[K, B]): Source[K, (A, B)] = new Source[K, (A, B)] { | |
override def read(in: K): Task[Option[(A, B)]] = { | |
fa.read(in).product(fb.read(in)).map { case (a, b) => | |
a.product(b) | |
} | |
} | |
// We do not override `readMany` here as we do not know anything about the underlying source. | |
} | |
def map[A, B](fa: Source[K,A])(f: A => B): Source[K, B] = new Source[K, B] { | |
override def read(in: K): Task[Option[B]] = { | |
fa.read(in).map(_.map(f)) | |
} | |
/** | |
* This is overridden to delegate to a possibly optimal underlying readMany. | |
*/ | |
override def readMany(in: List[K], parallelism: Int = 10): Observable[(K, B)] = { | |
fa.readMany(in, parallelism).map { case (k, v) => | |
(k, f(v)) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment