Skip to content

Instantly share code, notes, and snippets.

@dosht
Last active July 7, 2020 13:06
Show Gist options
  • Save dosht/6086d0c701be3d8e40088df6eed5c0d9 to your computer and use it in GitHub Desktop.
Save dosht/6086d0c701be3d8e40088df6eed5c0d9 to your computer and use it in GitHub Desktop.
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect.{ContextShift, IO, Resource}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
class KafkaContext(cs: ContextShift[IO]) {
// A thread pool with exactly 1 thread
private val threadPool = Executors.newFixedThreadPool(1)
protected val synchronousExecutionContext = ExecutionContext.fromExecutor(threadPool)
def execute[A](f: => A): IO[A] = cs.evalOn(synchronousExecutionContext)(IO(f))
// Alias for execute
def ~>[A](f: => A): IO[A] = execute(f)
def close(): IO[Unit] = IO(pool.shutdown())
}
object KafkaContext {
def resource(cs: ContextShift[IO]): Resource[IO, KafkaContext] =
Resource.make(IO(new KafkaContext(cs)))(_.close())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment