Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created March 17, 2019 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahoy-jon/f96c3ee74428c1f2ba3a5cb4920573c6 to your computer and use it in GitHub Desktop.
Save ahoy-jon/f96c3ee74428c1f2ba3a5cb4920573c6 to your computer and use it in GitHub Desktop.
/*
* Copyright 2017-2019 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package scalaz.zio
import scalaz.zio.Exit.Cause
import scalaz.zio.internal.Executor
package object blocking {
final def blockingExecutor: ZIO[Blocking, Nothing, Executor] =
ZIO.accessM(_.blocking.blockingExecutor)
final def blocking[R1 <: Blocking, E, A](zio: ZIO[R1, E, A]): ZIO[R1, E, A] =
blockingExecutor.flatMap(exec => zio.lock(exec))
final def interruptible[A](effect: => A): ZIO[Blocking, Throwable, A] =
ZIO.flatten(ZIO.effectTotal {
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import scalaz.zio.internal.OneShot
val lock = new ReentrantLock()
val thread = new AtomicReference[Option[Thread]](None)
val barrier = OneShot.make[Unit]
def withMutex[B](b: => B): B =
try {
lock.lock(); b
} finally lock.unlock()
val interruptThread: UIO[Unit] =
ZIO.effectTotal {
var looping = true
var n = 0L
val base = 2L
while (looping) {
withMutex(thread.get match {
case None => looping = false; ()
case Some(thread) => thread.interrupt()
})
if (looping) {
n += 1
Thread.sleep(math.min(50, base * n))
}
}
}
val awaitInterruption: UIO[Unit] = ZIO.effectTotal(barrier.get())
for {
a <- (for {
fiber <- blocking(ZIO.effectTotal[Either[Cause[Throwable], A]] {
val current = Some(Thread.currentThread)
withMutex(thread.set(current))
try Right(effect)
catch {
case _: InterruptedException =>
Thread.interrupted // Clear interrupt status
Left(Cause.interrupt)
case t: Throwable =>
Left(Cause.fail(t))
} finally {
withMutex { thread.set(None); barrier.set(()) }
}
}).fork
a <- fiber.join.absolve.unsandbox
} yield a).ensuring(interruptThread *> awaitInterruption)
} yield a
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment