Created
May 17, 2018 15:02
-
-
Save danicheg/f554e73a8c4e541db673a67117625ef9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.Async | |
import org.apache.kafka.clients.producer.{ | |
ProducerRecord, | |
RecordMetadata, | |
KafkaProducer => ApacheKafkaProducer | |
} | |
import org.apache.kafka.clients.producer.ProducerConfig | |
import org.apache.kafka.common.serialization.Serializer | |
import fs2._ | |
import scala.util.Try | |
trait ProducerOps[F[_], K, V] { | |
def close: F[Unit] | |
def sendAsync(record: ProducerRecord[K, V]): F[Unit] | |
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] | |
} | |
trait Producer[F[_], K, V] { | |
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] | |
def sendAsync: Sink[F, ProducerRecord[K, V]] | |
} | |
object ProducerOps { | |
def apply[F[_], K: Serializer, V: Serializer]( | |
underlyingProducer: ApacheKafkaProducer[K, V] | |
)(implicit F: Async[F]): ProducerOps[F, K, V] = new ProducerOps[F, K, V] { | |
def close: F[Unit] = F.delay { | |
underlyingProducer.close() | |
} | |
def sendAsync(record: ProducerRecord[K, V]): F[Unit] = | |
F.delay { | |
underlyingProducer.send(record) | |
() | |
} | |
def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] = | |
F.async[RecordMetadata] { cb => | |
F.delay { | |
underlyingProducer.send(message, (metadata: RecordMetadata, exception: Exception) => { | |
if (exception eq null) { | |
cb(Right(metadata)) | |
} else { | |
cb(Left(exception)) | |
} | |
}) | |
() | |
} | |
() | |
} | |
} | |
} | |
object Producer { | |
def apply[F[_], K: Serializer, V: Serializer]( | |
producerSettings: ProducerSettings[K, V] | |
)(implicit F: Async[F]): Producer[F, K, V] = | |
new Producer[F, K, V] { | |
private def createProducerOps: F[ProducerOps[F, K, V]] = | |
F.suspend { | |
val javaProps = | |
producerSettings.props.foldLeft(new java.util.Properties) { | |
case (p, (k, v)) => p.put(k, v); p | |
} | |
F.fromTry( | |
Try( | |
ProducerOps[F, K, V]( | |
new ApacheKafkaProducer( | |
javaProps, | |
producerSettings.keySerializer, | |
producerSettings.valueSerializer | |
) | |
) | |
) | |
) | |
} | |
private def withOps[I, O](f: ProducerOps[F, K, V] => Pipe[F, I, O]): Pipe[F, I, O] = | |
(input: Stream[F, I]) => Stream.bracket(createProducerOps)(ops => f(ops)(input), _.close) | |
def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = withOps { ops => input => | |
input.evalMap(ops.sendSync) | |
} | |
def sendAsync: Sink[F, ProducerRecord[K, V]] = withOps { ops => input => | |
input.evalMap(ops.sendAsync) | |
} | |
} | |
} | |
final case class ProducerSettings[K: Serializer, V: Serializer]( | |
props: Map[String, String] = Map.empty | |
) { | |
val keySerializer: Serializer[K] = util.serializer[K] | |
val valueSerializer: Serializer[V] = util.serializer[V] | |
def withProperty(key: String, value: String): ProducerSettings[K, V] = | |
ProducerSettings[K, V](props.updated(key, value)) | |
def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] = | |
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) | |
def withAcks(acks: String): ProducerSettings[K, V] = | |
withProperty(ProducerConfig.ACKS_CONFIG, acks) | |
} | |
object util { | |
def serializer[T](implicit s: Serializer[T]): Serializer[T] = s | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment