-
-
Save alexandru/b258f67ab1e21d61d06dcfd6ec73557a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicReference | |
import cats.effect.{ IO, Resource } | |
import com.ing.raptor.common.UnlawfulEffect | |
import org.reactivestreams.{ Publisher, Subscriber, Subscription } | |
def resourceAsPublisher[A](r: Resource[IO, A]): Publisher[A] = { | |
new Publisher[A] { | |
override def subscribe(s: Subscriber[_ >: A]): Unit = { | |
s.onSubscribe(new Subscription { | |
private[this] var phase: Long = 2 | |
private[this] val cancelable = new AtomicReference(IO.unit) | |
override def request(n: Long): Unit = { | |
if (n <= 0) { | |
s.onError(new IllegalArgumentException("n must be strictly positive")) | |
return | |
} else if (n > 1) { | |
// Oops!!! | |
s.onError(new IllegalArgumentException("resource will be closed immediately if buffered")) | |
return | |
} | |
phase = math.max(phase - n, 0) | |
phase match { | |
case 1 => | |
r.allocated.flatMap { | |
case (res, cancel) => | |
if (!cancelable.compareAndSet(IO.unit, cancel)) { | |
cancel *> IO(s.onComplete()) | |
} else { | |
IO(s.onNext(res)) | |
} | |
}.unsafeToFuture | |
case 0 => | |
closeAndSignal.unsafeToFuture | |
} | |
() | |
} | |
override def cancel(): Unit = { | |
UnlawfulEffect.unsafeToFuture(closeAndSignal) | |
() | |
} | |
private[this] val closeAndSignal: IO[Unit] = | |
IO.suspend { | |
val cancel = cancelable.getAndSet(null) | |
if (cancel != null) { | |
cancel *> IO(s.onComplete()) | |
} else { | |
IO.unit | |
} | |
} | |
}) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment