Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Last active April 22, 2017 04:57
Show Gist options
  • Save pchlupacek/989a2801036a9441da252726a1b4972d to your computer and use it in GitHub Desktop.
Save pchlupacek/989a2801036a9441da252726a1b4972d to your computer and use it in GitHub Desktop.
converting fs2 to scalaz.stream and vice versa
package spinoco.scalaz.stream
import fs2.Handle
import fs2._
import scalaz.concurrent.{Actor, Task}
import scalaz.stream.{Cause, Process, wye}
import scala.language.higherKinds
import scalaz.{-\/, \/, \/-}
import scalaz.\/._
object fs2Conversion {
implicit val F = fs2.Task.asyncInstance(fs2.Strategy.sequential)
def fs2ToProcess[A](fs2in:Stream[fs2.Task,A]):Process[Task,A] = Process.suspend {
import impl._
val actor = syncActor[AttemptFS2,AttemptScalaz, A]
def stepper : Pipe[fs2.Task,A,Nothing] = {
// for the performance reasons we do this manually instad of chunk & evalMap
def go:Handle[fs2.Task,A] => fs2.Pull[fs2.Task,Nothing, Unit] = {
_.receiveOption {
case None => Pull.done
case Some((chunk,h)) =>
Pull.eval(fs2.Task.unforkedAsync[Boolean] { cb =>
actor ! OfferChunk[AttemptFS2,A](chunk, cb)
}).flatMap { continue =>
if (continue) go(h)
else Pull.done
}
}
}
_.pull(go)
}
(fs2.Stream.eval(fs2.Task.unforkedAsync[Boolean]{ cb => actor ! RegisterPublisher[AttemptFS2](cb) }) ++
fs2in.through(stepper))
.run.unsafeRunAsync {
case Right(_) => actor ! PublisherDone(None)
case Left(rsn) => actor ! PublisherDone(Some(rsn))
}
//construct Process from evaluating the actor
def go:Process[Task,A] = {
Process.eval(Task.async[Option[Chunk[A]]]{ cb =>
actor ! RequestChunk[AttemptScalaz,A](cb)
}).flatMap {
case None => Process.halt
case Some(chunk) => Process.emitAll(chunk.toList) ++ go
}
}
go
.onHalt {
case Cause.End | Cause.Kill => (Process.eval_(Task.delay(actor ! SubscriberDone(None))) ++ Process.halt).asFinalizer
case Cause.Error(rsn) => (Process.eval_(Task.delay(actor ! SubscriberDone(Some(rsn)))) ++ Process.fail(rsn)).asFinalizer
}
}
def processToFs2[A](in:Process[Task,A]):Stream[fs2.Task,A] = Stream.suspend {
import impl._
val actor = syncActor[AttemptScalaz, AttemptFS2, A]
(Process.eval(Task.async[Boolean]{ cb => actor ! RegisterPublisher[AttemptScalaz](cb) }) ++
in.evalMap { a => Task.async[Boolean]{ cb => actor ! OfferChunk[AttemptScalaz,A](Chunk.singleton(a), cb) } }
.takeWhile(identity))
.run.runAsync {
case \/-(_) => actor ! PublisherDone(None)
case -\/(rsn) => actor ! PublisherDone(Some(rsn))
}
// stream from an actor
def go:fs2.Stream[fs2.Task,A] = {
Stream.eval(fs2.Task.unforkedAsync[Option[Chunk[A]]]{ cb =>
actor ! RequestChunk[AttemptFS2,A](cb)
}).flatMap {
case None => Stream.empty // done
case Some(chunk) => Stream.chunk(chunk) ++ go
}
}
go
.onError { rsn =>
Stream.eval_(fs2.Task.delay { actor ! SubscriberDone(Some(rsn)) }) ++ Stream.fail(rsn)
}
.onFinalize {
fs2.Task.delay(actor ! SubscriberDone(None))
}
}
def fs2Task2Task[A](in:fs2.Task[A]):Task[A] =
Task.async[A] { cb => in.unsafeRunAsync { r => cb(\/.fromEither(r)) } }
def task2fs2Task[A](in:Task[A]):fs2.Task[A] =
fs2.Task.async[A]({ cb => in.runAsync { r => cb(r.toEither) } })(fs2.Strategy.sequential)
private object impl {
type AttemptFS2[+A] = Either[Throwable,A]
type AttemptScalaz[+A] = Throwable \/ A
sealed trait Attempt[F[+_]] {
def success[A](a:A):F[A]
val continue:F[Boolean] = success(true)
val stop:F[Boolean] = success(false)
def failure(rsn:Throwable):F[Nothing]
def fromResult(result:Option[Throwable]):F[Boolean] = result.map(failure).getOrElse(stop)
}
object Attempt {
implicit val fs2Instance:Attempt[AttemptFS2] = new Attempt[AttemptFS2] {
def success[A](a: A): AttemptFS2[A] = Right(a)
def failure(rsn: Throwable): AttemptFS2[Nothing] = Left(rsn)
}
implicit val processInstance: Attempt[AttemptScalaz]= new Attempt[AttemptScalaz] {
def success[A](a: A): AttemptScalaz[A] = \/-(a)
def failure(rsn: Throwable): AttemptScalaz[Nothing] = -\/(rsn)
}
}
case class RegisterPublisher[F[+_]](cb: F[Boolean] => Unit) extends M[F,Nothing,Nothing]
case class RequestChunk[F[+_], A](cb: F[Option[Chunk[A]]] => Unit) extends M[Nothing,F,A]
case class OfferChunk[F[+_], A](chunk:Chunk[A], cb: F[Boolean] => Unit) extends M[F,Nothing,A]
case class SubscriberDone(result:Option[Throwable]) extends M[Nothing,Nothing,Nothing]
case class PublisherDone(result:Option[Throwable]) extends M[Nothing,Nothing,Nothing]
sealed trait M[+FIn[+_], +FOut[+_] ,+A]
def syncActor[FIn[+_], FOut[+_], A](implicit FIn:Attempt[FIn], FOut:Attempt[FOut]): Actor[M[FIn,FOut,A]] = {
var upReady:Option[FIn[Boolean] => Unit] = None
var requestReady:Option[FOut[Option[Chunk[A]]] => Unit] = None
var done:Option[Option[Throwable]] = None
Actor.actor[M[FIn, FOut, A]]({
case register:RegisterPublisher[FIn @ unchecked] =>
if (requestReady.nonEmpty) register.cb(FIn.continue)
else upReady = Some(register.cb)
case request:RequestChunk[FOut @unchecked,A @unchecked] =>
done match {
case None => requestReady = Some(request.cb) ; upReady.foreach { cb => cb(FIn.continue) } ; upReady = None
case Some(None) => request.cb(FOut.success(None))
case Some(Some(rsn)) => request.cb(FOut.failure(rsn))
}
case offer:OfferChunk[FIn @unchecked ,A @unchecked] =>
done match {
case None => requestReady match {
case Some(rcb) => rcb(FOut.success(Some(offer.chunk))) ; upReady = Some(offer.cb) ; requestReady = None
case None =>
val rsn = new Throwable(s"Downstream not ready, while requesting data : ${offer.chunk}")
done = Some(Some(rsn))
offer.cb(FIn.failure(rsn))
}
case Some(result) => offer.cb(FIn.fromResult(result))
}
case SubscriberDone(result) =>
upReady.foreach { cb => cb(FIn.fromResult(result)) }
upReady = None
done = done orElse Some(result)
case PublisherDone(result) =>
requestReady.foreach { cb => result match {
case None => cb(FOut.success(None))
case Some(rsn) => cb(FOut.failure(rsn))
}}
requestReady = None
done = done orElse Some(result)
})(scalaz.concurrent.Strategy.Sequential)
}
}
}
@dbousamra
Copy link

I receive a StackOverflow when using the task2fs2Task too often (i.e. .repeat).

Exception in thread "main" java.lang.StackOverflowError
	at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at fs2.StreamCore$$anonfun$step$2$$anon$5.liftedTree1$1(StreamCore.scala:257)
	at fs2.StreamCore$$anonfun$step$2$$anon$5.map(StreamCore.scala:257)
	at fs2.StreamCore$$anonfun$step$2$$anon$5.map(StreamCore.scala:232)
	at fs2.StreamCore$Stack$$anon$14$$anon$15.map(StreamCore.scala:436)
	at fs2.StreamCore$Stack$$anon$12.fold(StreamCore.scala:412)
	at fs2.StreamCore$Stack$$anon$14.fold(StreamCore.scala:431)
	at fs2.StreamCore$$anonfun$step$2.apply(StreamCore.scala:232)
	at fs2.StreamCore$$anonfun$step$2.apply(StreamCore.scala:229)
	at fs2.Scope$$anonfun$flatMap$1.apply(Scope.scala:18)
	at fs2.Scope$$anonfun$flatMap$1.apply(Scope.scala:18)
	at fs2.util.Free$$anonfun$step$1.apply(Free.scala:56)
	at fs2.util.Free$$anonfun$step$1.apply(Free.scala:56)
	at fs2.util.Free$Bind$$anonfun$_fold$2$$anonfun$apply$1.apply(Free.scala:134)
	at fs2.Scope$$anonfun$bindEnv$1$$anon$2.bind(Scope.scala:58)
	at fs2.Scope$$anonfun$bindEnv$1$$anon$2.bind(Scope.scala:34)
	at fs2.util.Free$Bind$$anonfun$_fold$2.apply(Free.scala:134)
	at fs2.util.Free$$anonfun$suspend$1.apply(Free.scala:84)
	at fs2.util.Free$$anonfun$suspend$1.apply(Free.scala:84)
	at fs2.util.Free$$anonfun$step$1.apply(Free.scala:56)
	at fs2.util.Free$$anonfun$step$1.apply(Free.scala:56)
	at fs2.util.Free$Bind$$anonfun$_runTranslate$1.apply(Free.scala:115)
	at fs2.Task$$anonfun$flatMap$1$$anonfun$1.apply(Task.scala:40)
	at fs2.Task$$anonfun$flatMap$1$$anonfun$1.apply(Task.scala:40)
	at fs2.util.Attempt$.apply(Attempt.scala:12)
	at fs2.Task$$anonfun$flatMap$1.apply(Task.scala:40)
	at fs2.Task$$anonfun$flatMap$1.apply(Task.scala:38)
	at fs2.internal.Future$$anonfun$flatMap$1.apply(Future.scala:17)
	at fs2.internal.Future$$anonfun$flatMap$1.apply(Future.scala:17)
	at fs2.internal.Future.step(Future.scala:53)
	at fs2.internal.Future.listen(Future.scala:30)
	at fs2.internal.Future$$anonfun$listen$1$$anonfun$apply$7.apply(Future.scala:34)
	at fs2.internal.Future$$anonfun$listen$1$$anonfun$apply$7.apply(Future.scala:34)
	at fs2.internal.Trampoline$$anonfun$map$1.apply(Trampoline.scala:10)
	at fs2.internal.Trampoline$$anonfun$map$1.apply(Trampoline.scala:10)
	at fs2.internal.Trampoline$.run(Trampoline.scala:31)
	at fs2.internal.Trampoline$class.run(Trampoline.scala:12)
	at fs2.internal.Trampoline$FlatMap.run(Trampoline.scala:18)
	at fs2.Task$$anonfun$async$2$$anonfun$apply$19$$anonfun$apply$2.apply$mcV$sp(Task.scala:247)
	at fs2.Strategy$$anon$7.apply(Strategy.scala:69)
	at fs2.Task$$anonfun$async$2$$anonfun$apply$19.apply(Task.scala:247)
	at fs2.Task$$anonfun$async$2$$anonfun$apply$19.apply(Task.scala:247)
	at com.imageintelligence.ava.dickens.FS2Helpers$$anonfun$task2fs2Task$1$$anonfun$apply$2.apply(FS2Helpers.scala:15)
	at com.imageintelligence.ava.dickens.FS2Helpers$$anonfun$task2fs2Task$1$$anonfun$apply$2.apply(FS2Helpers.scala:15)
	at scalaz.concurrent.Future$$anonfun$runAsync$1.apply(Future.scala:142)
	at scalaz.concurrent.Future$$anonfun$runAsync$1.apply(Future.scala:142)
	at scalaz.concurrent.Future.listen(Future.scala:76)
	at scalaz.concurrent.Future.runAsync(Future.scala:142)
	at scalaz.concurrent.Task.runAsync(Task.scala:153)
	at com.imageintelligence.ava.dickens.FS2Helpers$$anonfun$task2fs2Task$1.apply(FS2Helpers.scala:15)
	at com.imageintelligence.ava.dickens.FS2Helpers$$anonfun$task2fs2Task$1.apply(FS2Helpers.scala:15)
	at fs2.Task$$anonfun$async$2.apply(Task.scala:246)
	at fs2.Task$$anonfun$async$2.apply(Task.scala:246)
	at fs2.internal.Future.listen(Future.scala:34)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment