Skip to content

Instantly share code, notes, and snippets.

@francescofrontera
Last active January 22, 2019 14:00
Show Gist options
  • Save francescofrontera/1f9c1827520a4903c6fd82c3b44a98a3 to your computer and use it in GitHub Desktop.
Save francescofrontera/1f9c1827520a4903c6fd82c3b44a98a3 to your computer and use it in GitHub Desktop.
Kafka utils DSL for create Topics
import java.util.Properties
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
object InternalTopicManager {
import scala.collection.JavaConverters._
final def removeNoAlphanumericCharsFromTopic(topicName: String): String =
topicName.replaceAll("[^\\p{L}\\p{Nd}]+", "")
case class KafkaInternalUtils(props: Properties) {
private lazy val adminClient = AdminClient.create(props)
def createTopic(topicName: String, partitionNumber: Int, replicationNumber: Short): Void = {
val topic = new NewTopic(topicName, partitionNumber, replicationNumber)
adminClient.createTopics(List(topic).asJavaCollection).all().get()
}
def createTopic(topicNames: List[String], partitionNumber: Int, replicationNumber: Short): Void = {
val producedTopics = topicNames.map(new NewTopic(_, partitionNumber, replicationNumber))
adminClient.createTopics(producedTopics.asJavaCollection).all().get()
}
}
def apply(bootstrapServers: String): KafkaInternalUtils = {
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
KafkaInternalUtils(props)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment