Skip to content

Instantly share code, notes, and snippets.

val f: Int => Future[Person] = ???
val ids = List(1,2,3,4,5)
val persons: Future[List[Person]] = Future.traverse(ids, f) // parallel computation using Applicative
val persons2: List[Future[Person]] = ids map (id => f(id)) // sequential computation
persons2.sequence : Future[List[Person]] // now you can wait on one Future instead of many Futures
class ListingProcessor(transform: RawListing => FullListing, 
                       save: String => Async[Unit]) {

    def process(listings: Seq[RawListing]): Async[Unit] = {
      val fullListings = listings.map(transform) 
      // ...logic, etc
      save(fullListings.toJson)
    }
}
val input: Seq[RawListing] = //... fixture
def transform(in: RawListing): FullListing = //... trivial
var savedResult: String = null
def save(json: String): Async[Unit] = Async {
  insertResultHere = json
}
val expectedResult = //... actually generate some sort of JSON

val processor = new ListingProcessor(transform, save)
trait Sink[A] {
  def apply(a: A): Async[Unit]
}
class ListingProcessor(transform: RawListing => FullListing, 
                       save: Sink[Seq[FullListing]]) {

    def process(listings: Seq[RawListing]): Async[Unit] = {
      val fullListings = listings.map(transform) 
      // ...logic, etc
      save(fullListings)
    }
}
class JsonSink[-A](internalSink: Sink[String])(implicit encode: Encoder[A]) extends Sink[A] {
  override def apply(a: A): Async[Unit] = {
    internalSink(encode(a).noSpaces)
  }
}
class HttpSink(http: HttpService) extends Sink[String] {
  def apply(requestBody: String): Async[Unit] = {
    http.put(requestBody).map(_ => ())
  }
}
// Application wiring
val httpSink = new HttpSink(HttpService(s"${config.apiHost}/v1/listings"))
val jsonSink = new JsonSink[Seq[FullListing]](httpSink)
val listingTransformer = // ...
new ListingProcessor(listingTransformer, jsonSink)
class MemorySink[A] extends Sink[A]{
  private var buffer: Vector[A] = Vector.empty

  def replay: Vector[A] = buffer

  def last: Option[A] = buffer.lastOption

  override def apply(a: A): Async[Unit] = Async {
 buffer :+= a
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults]
val processor = new ListingProcessor(_ => Task.now(Some(outputListing)), save)

processor.processListing(Seq(inputListing))
// resolve asynchrony

save.last should beSome(Seq(outputListing))