Skip to content

Instantly share code, notes, and snippets.

@danicheg
Created May 17, 2018 15:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danicheg/f554e73a8c4e541db673a67117625ef9 to your computer and use it in GitHub Desktop.
Save danicheg/f554e73a8c4e541db673a67117625ef9 to your computer and use it in GitHub Desktop.
import cats.effect.Async
import org.apache.kafka.clients.producer.{
ProducerRecord,
RecordMetadata,
KafkaProducer => ApacheKafkaProducer
}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.Serializer
import fs2._
import scala.util.Try
trait ProducerOps[F[_], K, V] {
def close: F[Unit]
def sendAsync(record: ProducerRecord[K, V]): F[Unit]
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata]
}
trait Producer[F[_], K, V] {
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata]
def sendAsync: Sink[F, ProducerRecord[K, V]]
}
object ProducerOps {
def apply[F[_], K: Serializer, V: Serializer](
underlyingProducer: ApacheKafkaProducer[K, V]
)(implicit F: Async[F]): ProducerOps[F, K, V] = new ProducerOps[F, K, V] {
def close: F[Unit] = F.delay {
underlyingProducer.close()
}
def sendAsync(record: ProducerRecord[K, V]): F[Unit] =
F.delay {
underlyingProducer.send(record)
()
}
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] =
F.async[RecordMetadata] { cb =>
F.delay {
underlyingProducer.send(message, (metadata: RecordMetadata, exception: Exception) => {
if (exception eq null) {
cb(Right(metadata))
} else {
cb(Left(exception))
}
})
()
}
()
}
}
}
object Producer {
def apply[F[_], K: Serializer, V: Serializer](
producerSettings: ProducerSettings[K, V]
)(implicit F: Async[F]): Producer[F, K, V] =
new Producer[F, K, V] {
private def createProducerOps: F[ProducerOps[F, K, V]] =
F.suspend {
val javaProps =
producerSettings.props.foldLeft(new java.util.Properties) {
case (p, (k, v)) => p.put(k, v); p
}
F.fromTry(
Try(
ProducerOps[F, K, V](
new ApacheKafkaProducer(
javaProps,
producerSettings.keySerializer,
producerSettings.valueSerializer
)
)
)
)
}
private def withOps[I, O](f: ProducerOps[F, K, V] => Pipe[F, I, O]): Pipe[F, I, O] =
(input: Stream[F, I]) => Stream.bracket(createProducerOps)(ops => f(ops)(input), _.close)
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = withOps { ops => input =>
input.evalMap(ops.sendSync)
}
def sendAsync: Sink[F, ProducerRecord[K, V]] = withOps { ops => input =>
input.evalMap(ops.sendAsync)
}
}
}
final case class ProducerSettings[K: Serializer, V: Serializer](
props: Map[String, String] = Map.empty
) {
val keySerializer: Serializer[K] = util.serializer[K]
val valueSerializer: Serializer[V] = util.serializer[V]
def withProperty(key: String, value: String): ProducerSettings[K, V] =
ProducerSettings[K, V](props.updated(key, value))
def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
def withAcks(acks: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.ACKS_CONFIG, acks)
}
object util {
def serializer[T](implicit s: Serializer[T]): Serializer[T] = s
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment