Created
March 27, 2021 20:55
-
-
Save djspiewak/8755dc00979e1c601788082fc64865e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala | |
index ea00de579..0af2fde00 100644 | |
--- a/core/shared/src/main/scala/cats/effect/IOFiber.scala | |
+++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala | |
@@ -26,7 +26,7 @@ import scala.concurrent.duration._ | |
import scala.util.control.NonFatal | |
import java.util.concurrent.RejectedExecutionException | |
-import java.util.concurrent.atomic.AtomicBoolean | |
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} | |
import scala.util.control.NoStackTrace | |
/* | |
@@ -112,7 +112,8 @@ private final class IOFiber[A]( | |
private[this] val TypeBlocking = Sync.Type.Blocking | |
/* mutable state for resuming the fiber in different states */ | |
- private[this] var resumeTag: Byte = ExecR | |
+ private[this] val resumeTag = new AtomicInteger(ExecR) | |
+ | |
private[this] var resumeIO: IO[Any] = startIO | |
/* prefetch for Right(()) */ | |
@@ -128,10 +129,10 @@ private final class IOFiber[A]( | |
private[this] val autoYieldThreshold = runtime.config.autoYieldThreshold | |
override def run(): Unit = { | |
- // insert a read barrier after every async boundary | |
+ // insert a read barrier after every async boundary (TODO do we still need this?) | |
readBarrier() | |
try { | |
- (resumeTag: @switch) match { | |
+ (resumeTag.getAndSet(DoneR): @switch) match { | |
case 0 => execR() | |
case 1 => asyncContinueSuccessfulR() | |
case 2 => asyncContinueFailedR() | |
@@ -238,7 +239,7 @@ private final class IOFiber[A]( | |
asyncCancel(null) | |
} else if (iteration >= autoYieldThreshold) { | |
resumeIO = cur0 | |
- resumeTag = AutoCedeR | |
+ resumeTag.set(AutoCedeR) | |
rescheduleFiber(currentCtx)(this) | |
} else { | |
// This is a modulo operation in disguise. `iteration` is reset every time | |
@@ -568,10 +569,10 @@ private final class IOFiber[A]( | |
val ec = currentCtx | |
e match { | |
case Left(t) => | |
- resumeTag = AsyncContinueFailedR | |
+ resumeTag.set(AsyncContinueFailedR) | |
objectState.push(t) | |
case Right(a) => | |
- resumeTag = AsyncContinueSuccessfulR | |
+ resumeTag.set(AsyncContinueSuccessfulR) | |
objectState.push(a.asInstanceOf[AnyRef]) | |
} | |
execute(ec)(this) | |
@@ -749,7 +750,7 @@ private final class IOFiber[A]( | |
/* Cede */ | |
case 16 => | |
- resumeTag = CedeR | |
+ resumeTag.set(CedeR) | |
rescheduleFiber(currentCtx)(this) | |
case 17 => | |
@@ -795,7 +796,7 @@ private final class IOFiber[A]( | |
ctxs.push(ec) | |
conts.push(EvalOnK) | |
- resumeTag = EvalOnR | |
+ resumeTag.set(EvalOnR) | |
resumeIO = cur.ioa | |
execute(ec)(this) | |
} | |
@@ -805,7 +806,7 @@ private final class IOFiber[A]( | |
/* we know we're on the JVM here */ | |
if (cur.hint eq TypeBlocking) { | |
- resumeTag = BlockingR | |
+ resumeTag.set(BlockingR) | |
resumeIO = cur | |
runtime.blocking.execute(this) | |
} else { | |
@@ -838,7 +839,7 @@ private final class IOFiber[A]( | |
*/ | |
masks = initMask | |
- resumeTag = DoneR | |
+ resumeTag.set(DoneR) | |
resumeIO = null | |
/* | |
* Write barrier to publish masks. The thread which owns the runloop is | |
@@ -1093,10 +1094,10 @@ private final class IOFiber[A]( | |
} | |
if (error == null) { | |
- resumeTag = AfterBlockingSuccessfulR | |
+ resumeTag.set(AfterBlockingSuccessfulR) | |
objectState.push(r.asInstanceOf[AnyRef]) | |
} else { | |
- resumeTag = AfterBlockingFailedR | |
+ resumeTag.set(AfterBlockingFailedR) | |
objectState.push(error) | |
} | |
currentCtx.execute(this) | |
@@ -1198,7 +1199,7 @@ private final class IOFiber[A]( | |
val ec = popContext() | |
if (!shouldFinalize()) { | |
- resumeTag = AfterBlockingSuccessfulR | |
+ resumeTag.set(AfterBlockingSuccessfulR) | |
objectState.push(result.asInstanceOf[AnyRef]) | |
execute(ec)(this) | |
} else { | |
@@ -1212,7 +1213,7 @@ private final class IOFiber[A]( | |
val ec = popContext() | |
if (!shouldFinalize()) { | |
- resumeTag = AfterBlockingFailedR | |
+ resumeTag.set(AfterBlockingFailedR) | |
objectState.push(t) | |
execute(ec)(this) | |
} else { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment