Thread pools on the JVM should usually be divided into the following three categories:
- CPU-bound
- Blocking IO
- Non-blocking IO polling
Each of these categories has a different optimal configuration and usage pattern.
def blocking[F[_], A](fa: F[A])(implicit F: Async[F], timer: Timer[F]): F[A] = | |
F.bracket(Async.shift(blockingPool))(_ => ....)(_ => timer.shift) |
import cats.MonadError | |
import cats.effect.IO | |
import cats.effect.concurrent.Deferred | |
import cats.instances.list._ | |
import cats.instances.string._ | |
import cats.kernel.Monoid | |
import cats.syntax.all._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ |
import java.util.concurrent.CompletionStage | |
import cats.effect.Async | |
import cats.syntax.flatMap._ | |
case object EmptyValue extends Throwable | |
def to[F[_], A](fa: F[CompletionStage[A]])(implicit F: Async[F]): F[A] = { | |
fa.flatMap { f => | |
F.async[A] { cb => |
import cats.effect.{ExitCode, IO, IOApp} | |
import cats.instances.list._ | |
import cats.syntax.all._ | |
import fs2._ | |
import scala.concurrent.duration._ | |
object jobs extends IOApp { | |
val largeStream: Stream[IO, Int] = Stream.range(0, 100).covary[IO] |
There exist several DI frameworks / libraries
in the Scala
ecosystem. But the more functional code you write the more you'll realize there's no need to use any of them.
A few of the most claimed benefits are the following:
def scanEval[F[_]: Sync, S, A](p: Stream[F, A])(start: F[S])(f: (S, A) => F[S]): Stream[F, S] = { | |
def zipper(ref: Ref[F, S]): Stream[F, S] = | |
p.zip(Stream.eval(ref.get).repeat).evalMap { case (a, s) => | |
for { | |
ns <- f(s, a) | |
_ <- ref.set(ns) | |
} yield ns | |
} | |
for { |
type Arguments = Args[A] forSome { type A } | |
final class Args[A: SafeArgument](val underlying: Map[String, A]) | |
object Arguments { | |
def empty: ArgumentsAlt[String] = new ArgumentsAlt(Map.empty) | |
def apply[V: SafeArgument](kv: (String, V)*): ArgumentsAlt[V] = | |
new ArgumentsAlt(kv.toMap) | |
} |
package com.github.gvolpe.fs2rabbit.examples | |
import cats.effect.Effect | |
import com.github.gvolpe.fs2rabbit.config.QueueConfig | |
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit | |
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder | |
import com.github.gvolpe.fs2rabbit.model._ | |
import com.github.gvolpe.fs2rabbit.typeclasses.StreamEval | |
import fs2.{Pipe, Stream} |