Skip to content

Instantly share code, notes, and snippets.

@joshcough
Created July 18, 2015 15:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshcough/b29a70be8c6d65da0fc8 to your computer and use it in GitHub Desktop.
Save joshcough/b29a70be8c6d65da0fc8 to your computer and use it in GitHub Desktop.
import com.google.common.util.concurrent.{FutureCallback, Futures, MoreExecutors, ListenableFuture}
import java.util.concurrent.{Callable, Executors}
import scalaz.\/
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.syntax.either._
object TestWriter {
def main(args: Array[String]) {
testWriter.write(List("hello", ", ", "world."))
}
val executorService =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool())
val testWriter = new Writer[String, List[Char]] {
def onFailure(t: Throwable): Unit = println(s"failure: ${t.getMessage}")
def onSuccess(res: List[Char]): Unit = println(s"success: $res")
val outputStream: GenericChannel[ListenableFuture, String, List[Char]] =
new GenericChannel[ListenableFuture, String, List[Char]] {
def feed(payload: String): ListenableFuture[List[Char]] =
executorService.submit(new Callable[List[Char]] {
def call(): List[Char] = payload.toList
})
}
}
trait GenericChannel[F[_], I, O] {
def feed(payload: I): F[O]
}
trait Writer[I,O] { self =>
val outputStream: GenericChannel[ListenableFuture, I, O]
def write(records: Seq[I]): Unit = writeProcess(records).run.run
def onFailure(t: Throwable): Unit
def onSuccess(result: O): Unit
def writeProcess(records: Seq[I]): Process[Task, O] =
Process(records:_*).tee(writeChannel)(tee.zipApply).eval
def writeChannel: Channel[Task, I, O] = channel.lift(writeRecord)
def writeRecord(i: I): Task[O] =
Task.async[O] {
(cb: (Throwable \/ O) => Unit) =>
Futures.addCallback(outputStream feed i, new FutureCallback[O]() {
def onSuccess(result: O) = {
println("in onSuccess")
cb(result.right)
println("called callback...")
}
def onFailure(t: Throwable) = {
println("in onFailure")
cb(t.left)
println("called callback...")
}
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment