Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created June 2, 2023 18:24
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/91cfab27c7c3ce7c133b79068c5bc6dd to your computer and use it in GitHub Desktop.
Save djspiewak/91cfab27c7c3ce7c133b79068c5bc6dd to your computer and use it in GitHub Desktop.
package sillio
import cats.syntax.all._
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
final class IOFiber[A](_current: IO[A], executor: ExecutionContext) extends Fiber[A] with Runnable {
private[this] var current: IO[Any] = _current
private[this] var continuations: List[Either[Throwable, Any] => IO[Any]] =
{ oc => fireCompletion(oc.leftMap(_.some).map(_.asInstanceOf[A])); null } :: Nil
private[this] val listeners: AtomicReference[Set[Either[Option[Throwable], A] => Unit]] =
new AtomicReference(Set())
@volatile
private[this] var canceled: Boolean = false
@volatile
private[this] var outcome: Either[Option[Throwable], A] = null
// we don't have finalizers, so we don't need to sweat backpressure
def cancel: IO[Unit] =
IO {
canceled = true
fireCompletion(Left(None))
}
def join: IO[Either[Option[Throwable], A]] =
IO.async[Either[Option[Throwable], A]] { resume =>
onComplete(e => resume(Right(e)))
}
@tailrec
def run(): Unit = {
import IO._
if (!canceled) {
current match {
case null => ()
case Pure(value) =>
current = continue(Right(value))
run()
case Error(value) =>
current = continue(Left(value))
run()
case FlatMap(ioe: IO[e], f) =>
push {
case e @ Left(_) =>
continue(e)
case Right(value) =>
f(value.asInstanceOf[e])
}
current = ioe
run()
case HandleErrorWith(ioa, f) =>
push {
case Left(value) =>
f(value)
case e @ Right(_) =>
continue(e)
}
current = ioa
run()
case Async(k) =>
current = null
val done = new AtomicBoolean(false)
try {
k { e =>
if (!done.getAndSet(true) && !canceled) {
current = continue(e)
executor.execute(this)
}
}
} catch {
case NonFatal(t) =>
continue(Left(t))
case t: Throwable =>
executor.reportFailure(t)
System.exit(-1)
}
case Start(body) =>
val fiber = new IOFiber(body, executor)
executor.execute(fiber)
current = continue(Right(fiber))
run()
}
}
}
@tailrec
def onComplete(f: Either[Option[Throwable], A] => Unit): Unit = {
val ls = listeners.get()
if (ls == null) {
// race condition with fireCompletion; just chill for a second while it writes
while (outcome == null) {}
f(outcome)
} else {
val ls2 = ls + f
if (!listeners.compareAndSet(ls, ls2)) {
onComplete(f)
}
}
}
private[this] def fireCompletion(outcome: Either[Option[Throwable], A]): Unit = {
val ls = listeners.getAndSet(null)
if (ls != null) {
this.outcome = outcome
ls.foreach(_(outcome))
}
}
private[this] def continue(e: Either[Throwable, Any]): IO[Any] = {
// we never call this when it could be empty
val cont :: tail = continuations: @unchecked
continuations = tail
cont(e)
}
private[this] def push(cont: Either[Throwable, Any] => IO[Any]): Unit =
continuations ::= cont
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment