Skip to content

Instantly share code, notes, and snippets.

@corenti13711539
Last active March 14, 2019 16:10
Show Gist options
  • Save corenti13711539/fcd52f1fba7ccf7a57eeb5a92a26b3d9 to your computer and use it in GitHub Desktop.
Save corenti13711539/fcd52f1fba7ccf7a57eeb5a92a26b3d9 to your computer and use it in GitHub Desktop.
import fs2.Stream
import fs2.concurrent.Queue
import cats.implicits._
import cats.effect.implicits._
import cats.effect.{Concurrent, Resource}
import scala.language.higherKinds
abstract class JobSubmission[F[_], A]{
def submit(a: A): F[Unit]
}
object JobSubmission {
def impl[F[_]: Concurrent, A](f: A => F[Unit],
maxSize: Int,
maxConcurrentJobs: Int
): Resource[F, JobSubmission[F, A]] = {
for {
q <- Resource.liftF(Queue.bounded[F, A](maxSize))
drainFiber <- Resource.liftF(q.dequeue
.map(a => Stream.eval_(f(a)))
.parJoin(maxConcurrentJobs)
.compile.drain.start)
alg <- Resource.make(
new JobSubmission[F, A] {
def submit(a: A): F[Unit] = q.enqueue1(a)
}.pure[F]
)(_ => drainFiber.cancel)
} yield alg
}
}
def demo[F[_] : Concurrent]() = {
def processSubmission(msg: String) = println(s"processing data: '$msg'").pure[F]
val jsr = JobSubmission.impl(processSubmission, 5, 5)
jsr.use { js =>
js.submit("hello")
js.submit("world")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment