Skip to content

Instantly share code, notes, and snippets.

@sderosiaux
Created December 14, 2016 02:11
Show Gist options
  • Save sderosiaux/4fc4bc56898e33d7fd157362ab80863d to your computer and use it in GitHub Desktop.
Save sderosiaux/4fc4bc56898e33d7fd157362ab80863d to your computer and use it in GitHub Desktop.
How to use Kafka in tests (+ schemarepo)
import java.nio.file.Files
import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServerStartable}
import kafka.utils.ZkUtils
import org.apache.curator.test.TestingServer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.schemarepo.{InMemoryRepository, SubjectConfig, ValidatorFactory}
import org.springframework.util.SocketUtils
import collection.JavaConversions._
object KafkaInfrastructure {
def apply(topics: Set[String], subjects: Set[String] = Set()) = new KafkaInfrastructure(topics, subjects)
}
class KafkaInfrastructure(topics: Set[String], subjectsSet: Set[String] = Set()) {
val zkServer = new TestingServer(SocketUtils.findAvailableTcpPort())
val logsDir = Files.createTempDirectory("test-kafka").toFile
logsDir.deleteOnExit()
val kafkaServer = new KafkaServerStartable(KafkaConfig(Map(
"broker.id" -> 1,
"zookeeper.connect" -> zkServer.getConnectString,
"log.dir" -> logsDir.getAbsolutePath,
"port" -> SocketUtils.findAvailableTcpPort()
// "host.name" -> "localhost",
// "batch.size" -> 1,
)))
kafkaServer.startup()
val broker = kafkaServer.serverConfig.hostName + ":" + kafkaServer.serverConfig.port
topics.foreach(AdminUtils.createTopic(ZkUtils(zkServer.getConnectString, 30000, 30000, isZkSecurityEnabled = false), _, 10, 1))
lazy val subjects = subjectsSet.map(s => s -> new InMemoryRepository(ValidatorFactory.EMPTY).register(s, SubjectConfig.emptyConfig())).toMap
lazy val subject = subjects.head._2
def consume(topic: String, timeout: Int = 2000) = {
val consumer = new KafkaConsumer(Map(
"bootstrap.servers" -> broker,
"key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> "testid",
"auto.offset.reset" -> "earliest"
))
consumer.subscribe(List(topic))
val records = consumer.poll(timeout)
consumer.close()
records
}
def shutdown() = {
kafkaServer.shutdown()
zkServer.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment