Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created October 7, 2020 17:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/496a7dded2360ff6e750fefd2fa357b5 to your computer and use it in GitHub Desktop.
Save djspiewak/496a7dded2360ff6e750fefd2fa357b5 to your computer and use it in GitHub Desktop.
object Soviet {
def repeated[F[_]: Async, A, B, R](runner: (A => B) => F[R])(body: A => F[B]): F[R] =
for {
ref <- Sync[F].delay(new AtomicReference[A => B])
result <- Sync[F].delay(new AtomicReference[B])
latch <- Sync[F].delay(new Semaphore(1))
_ <- Sync[F].delay(latch.acquire())
r <- {
def adapt(cb: Either[Throwable, A] => Unit): A => B = { a =>
cb(Right(a))
latch.acquire()
result.get()
}
def redirect(a: A): B = {
var f: A => B = ref.getAndSet(null)
while (f == null) {
ref.await()
f = ref.getAndSet(null)
}
f(a)
}
val registerF = {
val cont: F[A] = Async[F] async_ { cb =>
ref.set(adapt(cb))
ref.notifyAll()
}
cont.flatMap(body) flatMap { b =>
Sync[F] delay {
result.set(b)
latch.release()
}
}
}
registerF.foreverM.start.bracket({ _ =>
runner(redirect _)
})(_.cancel)
}
} yield r
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment