Skip to content

Instantly share code, notes, and snippets.

@slyphon
Created July 11, 2013 03:04
Show Gist options
  • Save slyphon/5972217 to your computer and use it in GitHub Desktop.
Save slyphon/5972217 to your computer and use it in GitHub Desktop.
A super useful class for dealing with Futures and collections and dealing with results from backend data stores. Shout out to @caniszczyk for giving the OK to share this with the community. Authors: Jeremy Cloud, Kevin Oliver, Glen Sanford, and Evan Meagher
/**
* Copyright 2013 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.example.keyvalue
import com.twitter.util.{Future, Return, Throw, Try}
object KeyValueResult {
private val Empty = KeyValueResult()
private val EmptyFuture = Future.value(Empty)
def empty[K, V] = Empty.asInstanceOf[KeyValueResult[K, V]]
def emptyFuture[K, V] = EmptyFuture.asInstanceOf[Future[KeyValueResult[K, V]]]
/**
* Builds a future KeyValueResult using a future sequence of key-value tuples. That
* sequence does not necessarily match up with the sequence of keys provided. The
* sequence of pairs represent the found results. notFound will be filled in from the
* missing keys.
*/
def fromPairs[K, V](
keys: Iterable[K] = Nil
)(
futurePairs: Future[TraversableOnce[(K, V)]]
): Future[KeyValueResult[K, V]] = {
fromMap(keys) {
futurePairs map { _.toMap }
}
}
/**
* Builds a future KeyValueResult using a future map of found results. notFound will be filled
* in from the missing keys.
*/
def fromMap[K, V](
keys: Iterable[K] = Nil
)(
futureMap: Future[Map[K, V]]
): Future[KeyValueResult[K, V]] = {
futureMap map { found =>
KeyValueResult[K, V](
found = found,
notFound = keys.toSet -- found.keySet)
} handle { case t =>
KeyValueResult[K, V](failed = keys map { _ -> t } toMap)
}
}
/**
* Builds a future KeyValueResult using a future sequence of optional results. That
* sequence must match up pair-wise with the given sequence of keys. A value of Some[V] is
* counted as a found result, a value of None is counted as a notFound result.
*/
def fromSeqOption[K, V](
keys: Iterable[K]
)(
futureSeq: Future[Seq[Option[V]]]
): Future[KeyValueResult[K, V]] = {
futureSeq map { seq =>
keys.zip(seq).foldLeft(new KeyValueResultBuilder[K, V]) {
case (bldr, (key, tryRes)) => tryRes match {
case Some(value) => bldr.addFound(key, value)
case None => bldr.addNotFound(key)
}
} result()
} handle { case t =>
KeyValueResult[K, V](failed = keys map { _ -> t } toMap)
}
}
/**
* Builds a future KeyValueResult using a future sequence of Try results. That
* sequence must match up pair-wise with the given sequence of keys. A value of Return[V] is
* counted as a found result, a value of Throw is counted as a failed result.
*/
def fromSeqTry[K, V](
keys: Iterable[K]
)(
futureSeq: Future[Seq[Try[V]]]
): Future[KeyValueResult[K, V]] = {
futureSeq map { seq =>
keys.zip(seq).foldLeft(new KeyValueResultBuilder[K, V]) {
case (bldr, (key, tryRes)) => tryRes match {
case Return(value) => bldr.addFound(key, value)
case Throw(t) => bldr.addFailed(key, t)
}
} result()
} handle { case t =>
KeyValueResult[K, V](failed = keys map { _ -> t } toMap)
}
}
/**
* Builds a future KeyValueResult using a sequence of future options. That sequence must
* match up pair-wise with the given sequence of keys. A value of Some[V] is
* counted as a found result, a value of None is counted as a notFound result.
*/
def fromSeqFuture[K, V](
keys: Iterable[K]
)(
futureSeq: Seq[Future[Option[V]]]
): Future[KeyValueResult[K, V]] = {
fromSeqTryOptions(keys) {
Future.collect {
futureSeq map { _.transform(Future(_)) }
}
}
}
/**
* Builds a future KeyValueResult using a future sequence of Try[Option[V]]. That sequence must
* match up pair-wise with the given sequence of keys. A value of Return[Some[V]] is
* counted as a found result, a value of Return[None] is counted as a notFound result, and a value
* of Throw[V] is counted as a failed result.
*/
def fromSeqTryOptions[K, V](
keys: Iterable[K]
)(
futureSeq: Future[Seq[Try[Option[V]]]]
): Future[KeyValueResult[K, V]] = {
futureSeq map { seq =>
keys.zip(seq).foldLeft(new KeyValueResultBuilder[K, V]) {
case (bldr, (key, tryRes)) => tryRes match {
case Return(Some(value)) => bldr.addFound(key, value)
case Return(None) => bldr.addNotFound(key)
case Throw(t) => bldr.addFailed(key, t)
}
} result()
} handle { case t =>
KeyValueResult[K, V](failed = keys map { _ -> t } toMap)
}
}
/**
* Reduces several KeyValueResults down to just 1, by combining as if by ++, but
* more efficiently with fewer intermediate results.
*/
def sum[K, V](results: Iterable[KeyValueResult[K, V]]): KeyValueResult[K, V] = {
val bldr = new KeyValueResultBuilder[K, V]
results foreach { result =>
bldr.addFound(result.found)
bldr.addNotFound(result.notFound)
bldr.addFailed(result.failed)
}
val res = bldr.result()
if (res.notFound.isEmpty && res.failed.isEmpty) {
res
} else {
val notFound = res.notFound -- res.found.keySet
val failed = res.failed -- res.found.keySet -- res.notFound
KeyValueResult(res.found, notFound, failed)
}
}
}
case class KeyValueResult[K, +V](
found: Map[K, V] = Map.empty[K, V],
notFound: Set[K] = Set.empty[K],
failed: Map[K, Throwable] = Map.empty[K, Throwable]
) extends Iterable[(K, Try[Option[V]])] {
/**
* A cheaper implementation of isEmpty than the default which relies
* on building an iterator.
*/
override def isEmpty = found.isEmpty && notFound.isEmpty && failed.isEmpty
/**
* map over the keyspace to produce a new KeyValueResult
*/
def mapKeys[K2](f: K => K2): KeyValueResult[K2, V] =
copy(
found = found map { case (k, v) => f(k) -> v },
notFound = notFound map(f),
failed = failed map { case (k, t) => f(k) -> t }
)
/**
* Maps over found values to produce a new KeyValueResult. If the given function throws an
* exception for a particular value, that value will be moved to the `failed` bucket with
* the thrown exception.
*/
def mapFound[V2](f: V => V2): KeyValueResult[K, V2] =
mapValues { _ map { _ map f } }
/**
* map over the values provided by the iterator, to produce a new KeyValueResult
*/
def mapValues[V2](f: Try[Option[V]] => Try[Option[V2]]): KeyValueResult[K, V2] = {
val builder = new KeyValueResultBuilder[K, V2]
iterator foreach {
case (k, v) => builder.update(k, f(v))
}
builder.result()
}
/**
* Returns an Iterator that yields all found, notFound, and failed values
* represented in the combined Try[Option[V]] type.
*/
def iterator: Iterator[(K, Try[Option[V]])] =
(found.iterator map { case (k, v) => k -> Return(Some(v)) }) ++
(notFound.iterator map { k => k -> Return(None) }) ++
(failed.iterator map { case (k, t) => k -> Throw(t) })
/**
* Returns a copy in which all failed entries are converted to misses. The specific
* failure information is lost.
*/
def convertFailedToNotFound =
copy(
notFound = notFound ++ failed.keySet,
failed = Map.empty[K, Throwable]
)
/**
* Returns a copy in which all not-found entries are converted to failures.
*/
def convertNotFoundToFailed(f: K => Throwable) =
copy(
notFound = Set.empty[K],
failed = failed ++ (notFound map { k => k -> f(k) })
)
/**
* Combines two KeyValueResults. Conflicting founds/notFounds are resolved
* as founds, and conflicting (found|notFound)/failures are resolved as (found|notFound).
*/
def ++[K2 >: K, V2 >: V](that: KeyValueResult[K2, V2]): KeyValueResult[K2, V2] = {
if (this.isEmpty) that
else if (that.isEmpty) this.asInstanceOf[KeyValueResult[K2, V2]]
else {
val found = this.found ++ that.found
val notFound = this.notFound ++ that.notFound -- found.keySet
val failed = this.failed ++ that.failed -- found.keySet -- notFound
KeyValueResult(found, notFound, failed)
}
}
/**
* Looks up a result for a key.
*/
def apply(key: K): Try[Option[V]] = {
found.get(key) match {
case some @ Some(_) => Return(some)
case None =>
failed.get(key) match {
case Some(t) => Throw(t)
case None => Return(None)
}
}
}
/**
* Looks up a result for a key, returning a provided default if the key is not
* found or failed.
*/
def getOrElse[V2 >: V](key: K, default: => V2): V2 =
found.getOrElse(key, default)
/**
* If any keys fail, will return the first failure. Otherwise,
* will convert founds/notFounds to a Seq[Option[V]], ordered by
* the keys provided
*/
def toFutureSeqOfOptions(keys: Seq[K]): Future[Seq[Option[V]]] = {
failed.values.headOption match {
case Some(t) => Future.exception(t)
case None => Future.value(keys map { found.get(_) })
}
}
// This is unfortunate, but we end up pulling in Iterable's toString,
// which is not all that readable.
override def toString(): String = {
val sb = new StringBuilder(256)
sb.append("KeyValueResult(")
sb.append("found = ")
sb.append(found)
sb.append(", notFound = ")
sb.append(notFound)
sb.append(", failed = ")
sb.append(failed)
sb.append(')')
sb.toString()
}
}
class KeyValueResultBuilder[K, V] {
private[keyvalue] val found = Map.newBuilder[K, V]
private[keyvalue] val notFound = Set.newBuilder[K]
private[keyvalue] val failed = Map.newBuilder[K, Throwable]
def addFound(k: K, v: V) = { found += (k -> v); this }
def addNotFound(k: K) = { notFound += k; this }
def addFailed(k: K, t: Throwable) = { failed += (k -> t); this }
def addFound(kvs: Iterable[(K, V)]) = { found ++= kvs; this }
def addNotFound(ks: Iterable[K]) = { notFound ++= ks; this }
def addFailed(kts: Iterable[(K, Throwable)]) = { failed ++= kts; this }
def update(k: K, tryV: Try[Option[V]]) = {
tryV match {
case Throw(t) => addFailed(k, t)
case Return(None) => addNotFound(k)
case Return(Some(v)) => addFound(k, v)
}
}
def result() = KeyValueResult(found.result(), notFound.result(), failed.result())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment