Skip to content

Instantly share code, notes, and snippets.

@jmcardon
Created July 13, 2017 20:24
Show Gist options
  • Save jmcardon/169b93dfecc665f3abb17f4e5fab3723 to your computer and use it in GitHub Desktop.
Save jmcardon/169b93dfecc665f3abb17f4e5fab3723 to your computer and use it in GitHub Desktop.
import scala.concurrent.duration._
object Hi extends App {
case class A[K](k: K)
/**
* Settings for producers. See `akka.kafka.producer` section in
* reference.conf. Note that the [[ProducerSettings companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
*/
final class ProducerSettings[K, V](
val properties: Map[String, String],
val keySerializerOpt: Option[A[K]],
val valueSerializerOpt: Option[A[V]],
val closeTimeout: FiniteDuration,
val parallelism: Int,
val dispatcher: String
) {
/**
* The raw properties of the kafka-clients driver, see constants in
* `org.apache.kafka.clients.producer.ProducerConfig`.
*/
def withProperty(key: String, value: String): ProducerSettings[K, V] =
copy(properties = properties.updated(key, value))
def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout)
def withParallelism(parallelism: Int): ProducerSettings[K, V] =
copy(parallelism = parallelism)
def withDispatcher(dispatcher: String): ProducerSettings[K, V] =
copy(dispatcher = dispatcher)
private def copy(
properties: Map[String, String] = properties,
keySerializer: Option[A[K]] = keySerializerOpt,
valueSerializer: Option[A[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
parallelism: Int = parallelism,
dispatcher: String = dispatcher
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties, keySerializer, valueSerializer, closeTimeout, parallelism, dispatcher)
}
def getConnectionString(username: String, password: String): String =
raw"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$username" password="$password";"""
def getGenericSettings(): Map[String, String] =
Map("request.timeout.ms" -> "20000", "retry.backoff.ms" -> "500")
def getConfluentSettings(listener: String,
username: String,
password: String): Map[String, String] =
Map(
"listeners" -> listener,
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "PLAIN",
"sasl.jaas.config" -> getConnectionString(username, password)
)
def connect[K, V](settings: ProducerSettings[K, V],
listener: String,
username: Option[String],
password: Option[String],
other: Option[Map[String, String]]): ProducerSettings[K, V] = {
val m1 = getGenericSettings() ++ other.getOrElse(Map())
val result = (for {
u <- username
p <- password
} yield getConfluentSettings(listener, u, p)).getOrElse(Map()) ++ m1
result.foldLeft(settings)((a, b) => a.withProperty(b._1, b._2))
}
val producer = new ProducerSettings[String, String](Map(), None, None, 5 seconds, 2, "hi")
print(connect(producer, "hi", Some("kappa"), Some("keepo"), None).properties)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment