Created May 17, 2018 15:02
import cats.effect.Async
import org.apache.kafka.clients.producer.{
KafkaProducer => ApacheKafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.Serializer
import fs2._
import scala.util.Try
trait ProducerOps[F[_], K, V] {
def close: F[Unit]
def sendAsync(record: ProducerRecord[K, V]): F[Unit]
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata]
trait Producer[F[_], K, V] {
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata]
def sendAsync: Sink[F, ProducerRecord[K, V]]
object ProducerOps {
def apply[F[_], K: Serializer, V: Serializer](
underlyingProducer: ApacheKafkaProducer[K, V]
)(implicit F: Async[F]): ProducerOps[F, K, V] = new ProducerOps[F, K, V] {
def close: F[Unit] = F.delay {
def sendAsync(record: ProducerRecord[K, V]): F[Unit] =
F.delay {
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] =
F.async[RecordMetadata] { cb =>
F.delay {
underlyingProducer.send(message, (metadata: RecordMetadata, exception: Exception) => {
if (exception eq null) {
} else {
object Producer {
def apply[F[_], K: Serializer, V: Serializer](
producerSettings: ProducerSettings[K, V]
)(implicit F: Async[F]): Producer[F, K, V] =
new Producer[F, K, V] {
private def createProducerOps: F[ProducerOps[F, K, V]] =
F.suspend {
val javaProps =
producerSettings.props.foldLeft(new java.util.Properties) {
case (p, (k, v)) => p.put(k, v); p
ProducerOps[F, K, V](
new ApacheKafkaProducer(
private def withOps[I, O](f: ProducerOps[F, K, V] => Pipe[F, I, O]): Pipe[F, I, O] =
(input: Stream[F, I]) => Stream.bracket(createProducerOps)(ops => f(ops)(input), _.close)
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = withOps { ops => input =>
def sendAsync: Sink[F, ProducerRecord[K, V]] = withOps { ops => input =>
final case class ProducerSettings[K: Serializer, V: Serializer](
props: Map[String, String] = Map.empty
) {
val keySerializer: Serializer[K] = util.serializer[K]
val valueSerializer: Serializer[V] = util.serializer[V]
def withProperty(key: String, value: String): ProducerSettings[K, V] =
ProducerSettings[K, V](props.updated(key, value))
def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
def withAcks(acks: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.ACKS_CONFIG, acks)
object util {
def serializer[T](implicit s: Serializer[T]): Serializer[T] = s
