Skip to content

Instantly share code, notes, and snippets.

View decorator13.md
trait Sink[A] { 
  def apply(a: A): Async[Unit]

  final def contraMap[B](f: B => A): Sink[B] = 
    b => apply(f(b))
}
View decorator12.md
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults] // MemorySink[A] extends Sink[Id,A] now
val processor = new ListingProcessor[Id](_ => Some(outputListing), save)

processor.processListing(Seq(inputListing))

save.last should beSome(Seq(outputListing))
View decorator11.md
class ListingProcessor[F[_]: Monad](transform: RawListing => FullListing, 
                       save: Sink[F, Seq[FullListing]]) {

    def process(listings: Seq[RawListing]): F[Unit] = {
      val fullListings = listings.map(transform) 
      // ...logic, etc
      save(fullListings)
    }
}
View decorator10.md
trait Sink[F[_], A] {
  def apply(a: A): F[Unit]
}
View decorator9.md
val outputListing: FullListing = //...
val inputListing: RawListing = //...
val save = new MemorySink[ListingResults]
val processor = new ListingProcessor(_ => Task.now(Some(outputListing)), save)

processor.processListing(Seq(inputListing))
// resolve asynchrony

save.last should beSome(Seq(outputListing))
View decorator8.md
class MemorySink[A] extends Sink[A]{
  private var buffer: Vector[A] = Vector.empty

  def replay: Vector[A] = buffer

  def last: Option[A] = buffer.lastOption

  override def apply(a: A): Async[Unit] = Async {
    buffer :+= a
View decorator7.md
// Application wiring
val httpSink = new HttpSink(HttpService(s"${config.apiHost}/v1/listings"))
val jsonSink = new JsonSink[Seq[FullListing]](httpSink)
val listingTransformer = // ...
new ListingProcessor(listingTransformer, jsonSink)
View decorator6.md
class HttpSink(http: HttpService) extends Sink[String] {
  def apply(requestBody: String): Async[Unit] = {
    http.put(requestBody).map(_ => ())
  }
}
View decorator5.md
class JsonSink[-A](internalSink: Sink[String])(implicit encode: Encoder[A]) extends Sink[A] {
  override def apply(a: A): Async[Unit] = {
    internalSink(encode(a).noSpaces)
  }
}
View decorator4.md
class ListingProcessor(transform: RawListing => FullListing, 
                       save: Sink[Seq[FullListing]]) {

    def process(listings: Seq[RawListing]): Async[Unit] = {
      val fullListings = listings.map(transform) 
      // ...logic, etc
      save(fullListings)
    }
}
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.