Skip to content

Instantly share code, notes, and snippets.

@Odomontois
Created February 11, 2021 12:22
Show Gist options
  • Save Odomontois/361d8e5fc5b8eda6ab11229f24abb7b3 to your computer and use it in GitHub Desktop.
Save Odomontois/361d8e5fc5b8eda6ab11229f24abb7b3 to your computer and use it in GitHub Desktop.
FromPublisher
package healer.reactive
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import cats.effect.Concurrent
import cats.syntax.foldable._
import cats.syntax.monoid._
import cats.{Monad, Monoid}
import healer.reactive.impl._
import io.iteratee.Enumerator
import io.iteratee.internal.Step
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import tofu.syntax.monadic._
import scala.collection.immutable.ArraySeq
trait FromPublisher[F[_]] {
def toEnumerator[A](pub: Publisher[A], batchSize: Int): Enumerator[F, A]
def first[A](pub: Publisher[A]): F[A]
def firstOption[A](pub: Publisher[A]): F[Option[A]]
def foldLeftM[A, B](pub: Publisher[A], batchSize: Int = 100)(b: B)(f: (B, A) => F[B]): F[B]
def foldMapM[A, B: Monoid](pub: Publisher[A], batchSize: Int = 100)(f: A => F[B]): F[B]
def foldLeft[A, B](pub: Publisher[A])(b: B)(f: (B, A) => B): F[B]
def foldMap[A, B: Monoid](pub: Publisher[A])(f: A => B): F[B] = foldLeft(pub)(Monoid.empty[B])((b, a) => b |+| f(a))
def foreach[A](pub: Publisher[A])(f: A => F[Unit]): F[Unit]
}
object FromPublisher {
implicit def instance[F[_]: Concurrent]: FromPublisher[F] =
new FromPublisher[F] {
def first[A](pub: Publisher[A]): F[A] = impl.firstImpl[F, A, A](pub)(
a => Right(a),
Left(new NoSuchElementException("first: empty stream")),
)
def firstOption[A](pub: Publisher[A]): F[Option[A]] = impl.firstImpl[F, A, Option[A]](pub)(
a => Right(Some(a)),
Right(None)
)
def foldMapM[A, B: Monoid](pub: Publisher[A], batchSize: Int = 100)(f: A => F[B]): F[B] =
foldLeftM(pub, batchSize)(Monoid.empty[B])((b, a) => f(a).map(b |+| _))
def foldLeftM[A, B](pub: Publisher[A], batchSize: Int)(b: B)(f: (B, A) => F[B]): F[B] =
impl.startFold(pub, batchSize)(impl.foldLeftMLoop(b)(f))
def foreach[A](pub: Publisher[A])(f: A => F[Unit]): F[Unit] = foldMapM(pub)(f)
def toEnumerator[A](pub: Publisher[A], batchSize: Int): Enumerator[F, A] = new ToEnumerator(pub, batchSize)
def foldLeft[A, B](pub: Publisher[A])(b: B)(f: (B, A) => B): F[B] = foldLeftImpl(pub)(b, f)
}
}
private class First[A, B](success: A => Result[B], empty: => Result[B], k: Cont[B]) extends Subscriber[A] {
val canceled = new AtomicBoolean(false)
var sub: Subscription = null
def onSubscribe(s: Subscription): Unit =
if (canceled.get()) s.cancel()
else {
sub = s
s.request(1)
}
def onNext(a: A): Unit = {
k(success(a))
sub.cancel()
}
def onError(error: Throwable): Unit =
k(Left(error))
def onComplete(): Unit =
k(empty)
}
private class Fold[A, B](cont: Cont[B], init: B, f: (B, A) => B) extends Subscriber[A] {
val canceled = new AtomicBoolean(false)
var sub: Subscription = null
var b: B = init
def onSubscribe(s: Subscription): Unit =
if (canceled.get()) s.cancel()
else {
sub = s
s.request(1)
}
def onNext(a: A): Unit =
if (canceled.get()) sub.cancel()
else {
b = f(b, a)
sub.request(1)
}
def onError(err: Throwable): Unit = cont(Left(err))
def onComplete(): Unit = cont(Right(b))
}
private class FoldM[F[_], A](pub: Publisher[A], batchSize: Int)(implicit F: Concurrent[F]) { self =>
private val canceled = new AtomicBoolean(false)
val cancel = F.delay(canceled.set(true))
private class Sub(init: Cont[State[F, A]]) extends Subscriber[A] {
var count: Int = 0
val builder = Array.ofDim[Any](batchSize).asInstanceOf[Array[A]]
var sub: Subscription = null
val cont = new AtomicReference(init)
def onSubscribe(s: Subscription): Unit =
if (canceled.get()) s.cancel()
else {
sub = s
s.request(batchSize.toLong)
}
def onNext(a: A): Unit =
if (canceled.get()) sub.cancel()
else {
builder(count) = a
count += 1
if (count >= batchSize) {
val chunk = builder.toSeq
val next: F[State[F, A]] = F.async { k =>
this.cont.set(k)
sub.request(batchSize.toLong)
}
count = 0
cont.get()(Right(State(chunk, Some(next))))
}
}
def onError(err: Throwable): Unit = cont.get()(Left(err))
def onComplete(): Unit = {
val finalChunk = builder.take(count).toSeq
cont.get()(Right(State(finalChunk, None)))
}
}
def initDemand: F[State[F, A]] =
F.cancelable { k =>
pub.subscribe(new Sub(k))
cancel
}
}
private class ToEnumerator[F[_]: Concurrent, A](publisher: Publisher[A], batchSize: Int) extends Enumerator[F, A] {
private def loop[R](step: Step[F, A, R])(state: State[F, A]): F[Step[F, A, R]] =
if (step.isDone) step.pure[F]
else step.feed(state.chunk).flatMap(nextStep => state.next.fold(nextStep.pure[F])(_.flatMap(loop(nextStep))))
def apply[R](step: Step[F, A, R]): F[Step[F, A, R]] = impl.startFold(publisher, batchSize)(loop(step))
}
private object impl {
final case class State[F[_], A](chunk: Seq[A], next: Option[F[State[F, A]]])
type Result[+A] = Either[Throwable, A]
type Cont[-A] = Either[Throwable, A] => Unit
def firstImpl[F[_], A, B](
pub: Publisher[A]
)(f: A => Result[B], empty: => Result[B])(implicit F: Concurrent[F]): F[B] =
F.cancelable[B] { k =>
val sub = new First[A, B](f, empty, k)
pub.subscribe(sub)
F.delay { sub.canceled.set(true) }
}
def foldLeftImpl[F[_], A, B](pub: Publisher[A])(init: B, f: (B, A) => B)(implicit F: Concurrent[F]): F[B] =
F.cancelable[B] { k =>
val sub = new Fold[A, B](k, init, f)
pub.subscribe(sub)
F.delay(sub.canceled.set(true))
}
def startFold[F[_], A, B](pub: Publisher[A], batchSize: Int)(
loop: State[F, A] => F[B]
)(implicit F: Concurrent[F]): F[B] =
F.delay(new FoldM[F, A](pub, batchSize)).flatMap { folder =>
F.bracket(folder.initDemand)(loop)(_ => folder.cancel)
}
def foldLeftMLoop[F[_]: Monad, A, B](b: B)(f: (B, A) => F[B]): State[F, A] => F[B] = {
def loop(state: State[F, A], b: B): F[B] = {
val fb = ArraySeq.from[Any](state.chunk).asInstanceOf[ArraySeq[A]].foldLeftM(b)(f)
state.next.fold(fb)(getState =>
for {
b <- fb
state <- getState
res <- loop(state, b)
} yield res
)
}
loop(_, b)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment