Last active
March 14, 2019 16:10
-
-
Save corenti13711539/fcd52f1fba7ccf7a57eeb5a92a26b3d9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.Stream | |
import fs2.concurrent.Queue | |
import cats.implicits._ | |
import cats.effect.implicits._ | |
import cats.effect.{Concurrent, Resource} | |
import scala.language.higherKinds | |
abstract class JobSubmission[F[_], A]{ | |
def submit(a: A): F[Unit] | |
} | |
object JobSubmission { | |
def impl[F[_]: Concurrent, A](f: A => F[Unit], | |
maxSize: Int, | |
maxConcurrentJobs: Int | |
): Resource[F, JobSubmission[F, A]] = { | |
for { | |
q <- Resource.liftF(Queue.bounded[F, A](maxSize)) | |
drainFiber <- Resource.liftF(q.dequeue | |
.map(a => Stream.eval_(f(a))) | |
.parJoin(maxConcurrentJobs) | |
.compile.drain.start) | |
alg <- Resource.make( | |
new JobSubmission[F, A] { | |
def submit(a: A): F[Unit] = q.enqueue1(a) | |
}.pure[F] | |
)(_ => drainFiber.cancel) | |
} yield alg | |
} | |
} | |
def demo[F[_] : Concurrent]() = { | |
def processSubmission(msg: String) = println(s"processing data: '$msg'").pure[F] | |
val jsr = JobSubmission.impl(processSubmission, 5, 5) | |
jsr.use { js => | |
js.submit("hello") | |
js.submit("world") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment