Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Last active October 8, 2020 15:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/48ea7e7c2ad5cbd15452143296cf9898 to your computer and use it in GitHub Desktop.
Save djspiewak/48ea7e7c2ad5cbd15452143296cf9898 to your computer and use it in GitHub Desktop.
object Soviet {
def repeated[F[_]: Async, A, B, R](runner: (A => B) => F[R])(body: A => F[B]): F[R] =
for {
ref <- Sync[F].delay(new AtomicReference[A => B])
result <- Sync[F].delay(new AtomicReference[B])
latch <- Sync[F].delay(new Semaphore(1))
_ <- Sync[F].delay(latch.acquire())
r <- {
def adapt(cb: Either[Throwable, A] => Unit): A => B = { a =>
cb(Right(a))
latch.acquire()
result.get()
}
def redirect(a: A): B = {
var f: A => B = ref.getAndSet(null)
while (f == null) {
ref.await()
f = ref.getAndSet(null)
}
f(a)
}
val registerF = {
val cont: F[A] = Async[F] async_ { cb =>
ref.set(adapt(cb))
ref.notifyAll()
}
cont.flatMap(body) flatMap { b =>
Sync[F] delay {
result.set(b)
latch.release()
}
}
}
registerF.foreverM.start.bracket({ _ =>
runner(redirect _)
})(_.cancel)
}
} yield r
}
trait Exists[F[_]] {
type E
def apply(): F[E]
}
object Exists {
def apply[F[_], A](fa: F[A]): Exists[F] = new Exists[F] {
type E = A
def apply() = fa
}
}
object Soviet {
def apply[F[_]: Async, A](unsafe: (F ~> Id) => F[A]): Resource[F, A] =
for {
paramRef <- Resource.liftF(Sync[F].delay(new AtomicReference[Exists[λ[α => (F[α], AtomicReference[α], CountDownLatch)]]]))
invokeRef <- Resource.liftF(Sync[F].delay(new AtomicReference[() => Unit]))
invokeLatch <- Resource.liftF(Sync[F].delay(new Semaphore(1)))
_ <- Resource.liftF(Sync[F].delay(invokeLatch.acquire()))
runner = {
val cont: F[Unit] = Async[F] async_ { cb =>
invokeRef.set(() => cb(Right(())))
invokeLatch.release()
}
cont *> {
Sync[F].delay(paramRef.get) flatMap { triple =>
val (ft, ref, latch) = triple()
rt flatMap { t =>
Sync[F] delay {
ref.set(t)
latch.countDown()
}
}
}
}
}
_ <- runner.foreverM.background
back <- Resource liftF {
def unsafeRunSync[E](fe: F[E]): E = {
val ref = new AtomicReference[E]
val latch = new CountDownLatch(1)
invokeLatch.acquire()
paramRef.set(Exists[λ[α => (F[α], AtomicReference[α], CountDownLatch)]]((fe, ref, latch)))
invokeRef.get()()
latch.await()
ref.get()
}
unsafe(λ[F ~> Id](unsafeRunSync(_)))
}
} yield back
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment