Skip to content

Instantly share code, notes, and snippets.

@diesalbla
Created June 14, 2023 12:05
Show Gist options
  • Save diesalbla/5fadcd75d075617564b4aad403fbf810 to your computer and use it in GitHub Desktop.
Save diesalbla/5fadcd75d075617564b4aad403fbf810 to your computer and use it in GitHub Desktop.
Scala Coroutines - Basic definitions
abstract class Starter[A]:
def invoke(completion: Continuation[A]): A | Any | Null
trait ContinuationLib:
extension [A](continuation: Continuation[A])
def intercepted(ec: ExecutionContext): Continuation[A] =
continuation match
case x: ContinuationImpl =>
x.intercepted(ec).asInstanceOf[Continuation[A]]
case _ => continuation
trait StarterLib extends ContinuationLib:
extension [A](starter: Starter[A])
def start(completion: Continuation[A]): Unit =
create(completion).intercepted(completion.executionContext).resume(())
def create(completion: Continuation[A]): Continuation[Unit] =
if (starter.isInstanceOf[ContinuationImpl])
starter.create(completion)
else
starter.createFromCompletion(completion)
def createFromCompletion(completion: Continuation[A]): Continuation[Unit] =
val context = completion.context()
if (context == EmptyTuple)
new RestrictedContinuation(completion.asInstanceOf):
private var label = 0
override protected def invokeSuspend(
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type]): Any |
Null =
label match
case 0 =>
label = 1
result match
case Left(exception) =>
throw exception
case _ => ()
starter.invoke(this)
case 1 =>
label = 2
result match
case Left(exception) =>
throw exception
case Right(result) =>
result
case _ => throw new IllegalStateException("already completed")
else
new ContinuationImpl(completion.asInstanceOf, context):
private var label = 0
override def invokeSuspend(
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type])
: Any | Null | Continuation.State.Suspended.type =
label match
case 0 =>
label = 1
result match
case Left(exception) =>
throw exception
case _ => ()
starter.invoke(this)
case 1 =>
label = 2
result match
case Left(exception) =>
throw exception
case Right(result) =>
result
case _ => throw new IllegalStateException("already completed")
abstract class BaseContinuationImpl(
val completion: Continuation[Any | Null] | Null,
icontext: Tuple
) extends Continuation[Any | Null],
ContinuationStackFrame,
Serializable:
override type Ctx = Tuple
override def context() = icontext
override val executionContext: ExecutionContext =
if completion == null then throw RuntimeException("resume called with no completion")
else completion.executionContext
final override def resume(result: Any | Null): Unit = resumeAux(Right(result))
final override def raise(error: Throwable): Unit =
resumeAux(Left(error))
private def resumeAux(result: Either[Throwable, Any | Null]): Unit = {
var current = this
var param = result
while true do
if (completion == null) throw RuntimeException("resume called with no completion")
val outcome: Either[Throwable, Any | Null] =
try
val outcome = current.invokeSuspend(param)
if (outcome == Continuation.State.Suspended) return
Right(outcome)
catch
case exception: Throwable =>
Left(exception)
releaseIntercepted()
completion match
case base: BaseContinuationImpl =>
current = base
param = outcome
println(s"base raise/resume")
outcome.fold(current.raise, current.resume)
return
case _ =>
println(s"completion raise/resume")
outcome.fold(completion.raise, completion.resume)
return
}
protected def invokeSuspend(
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type]): Any | Null
protected def releaseIntercepted(): Unit = ()
def create(completion: Continuation[?]): Continuation[Unit] =
throw UnsupportedOperationException("create(Continuation) has not been overridden")
def create(value: Any | Null, completion: Continuation[Any | Null]): Continuation[Unit] =
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
override def callerFrame: ContinuationStackFrame | Null =
if (completion != null && completion.isInstanceOf[ContinuationStackFrame])
completion.asInstanceOf
else null
override def getStackTraceElement(): StackTraceElement | Null =
null
abstract class ContinuationImpl(
completion: Continuation[Any | Null],
icontext: Tuple
) extends BaseContinuationImpl(completion, icontext):
override type Ctx = Tuple
override def context() = icontext
override val executionContext: ExecutionContext = completion.executionContext
private var _intercepted: Continuation[Any | Null] = null
def intercepted(ec: ExecutionContext): Continuation[Any | Null] =
if (_intercepted != null) _intercepted
else
val interceptor = contextService[ContinuationInterceptor]()
val intercepted =
if (interceptor != null)
interceptor.interceptContinuation(
this
)
else this
_intercepted = intercepted
intercepted
override def releaseIntercepted(): Unit =
val intercepted = _intercepted
if (intercepted != null && intercepted != this)
val interceptor = contextService[ContinuationInterceptor]()
if (interceptor != null)
interceptor.releaseInterceptedContinuation(intercepted)
_intercepted = CompletedContinuation
object CompletedContinuation extends Continuation[Any | Null]:
override type Ctx = Nothing
override val executionContext: ExecutionContext = ???
override def context: CompletedContinuation.Ctx =
throw IllegalStateException("Already completed")
override def resume(result: Any | Null): Unit =
throw IllegalStateException("Already completed")
override def raise(error: Throwable): Unit =
throw IllegalStateException("Already completed")
abstract class RestrictedContinuation(
completion: Continuation[Any | Null] | Null,
) extends BaseContinuationImpl(completion, EmptyTuple):
if (completion != null)
require(completion.context() == EmptyTuple)
trait ContinuationStackFrame:
def callerFrame: ContinuationStackFrame | Null
def getStackTraceElement(): StackTraceElement | Null
type CancellationException = java.util.concurrent.CancellationException
class SafeContinuation[T] private (
val delegate: Continuation[T],
initialResult: T | Continuation.State)
extends SafeContinuationBase,
Continuation[T],
ContinuationStackFrame:
override type Ctx = delegate.Ctx
override val executionContext: ExecutionContext = delegate.executionContext
override def context(): Ctx = delegate.context()
result = initialResult
var errored: Boolean = false
override def resume(value: T): Unit =
while true do
this.result match {
case Continuation.State.Undecided =>
if (CAS_RESULT(Continuation.State.Undecided, value))
return ()
case Continuation.State.Suspended =>
if (CAS_RESULT(Continuation.State.Suspended, Continuation.State.Resumed)) {
delegate.resume(value)
return ()
}
case _ =>
throw IllegalStateException("Already resumed")
}
override def raise(error: Throwable): Unit =
while true do
val cur = this.result
if (cur == Continuation.State.Undecided) {
if (CAS_RESULT(Continuation.State.Undecided, error))
errored = true
return delegate.raise(error)
} else if (cur == Continuation.State.Suspended) {
if (CAS_RESULT(Continuation.State.Suspended, Continuation.State.Resumed)) {
errored = true
delegate.raise(error)
return ()
}
} else throw IllegalStateException("Already resumed")
def getOrThrow(): T | Null | Continuation.State.Suspended.type =
var result = this.result
if (result == Continuation.State.Undecided) {
if (CAS_RESULT(Continuation.State.Undecided, Continuation.State.Suspended)) {
return Continuation.State.Suspended
}
result = this.result
}
if (result == Continuation.State.Resumed) {
Continuation.State.Suspended
} else if ((result ne null) && errored) {
throw result.asInstanceOf[Throwable]
} else
result.asInstanceOf[T]
override def callerFrame: ContinuationStackFrame | Null =
if (delegate != null && delegate.isInstanceOf[ContinuationStackFrame]) delegate.asInstanceOf
else null
override def getStackTraceElement(): StackTraceElement | Null =
null
object SafeContinuation:
def init[A](cont: Continuation[A]): SafeContinuation[A] =
val intrinsic = cont.intercepted(cont.executionContext)
new SafeContinuation[A](intrinsic, Continuation.State.Undecided)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment