Skip to content

Instantly share code, notes, and snippets.

@l15k4
Last active May 17, 2016 23:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/5cdd6be19ec2fcb9a3a32c23f0f6866c to your computer and use it in GitHub Desktop.
Save l15k4/5cdd6be19ec2fcb9a3a32c23f0f6866c to your computer and use it in GitHub Desktop.
package com.example
import java.util.Properties
import akka.event.Logging
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.scalatest.time.{Second, Seconds, Span}
class KafkaPlayground extends AkkaSuite {
implicit val futurePatience = PatienceConfig(timeout = Span(980, Seconds), interval = Span(1, Second))
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
override def beforeAll(): Unit = try super.beforeAll() finally {
EmbeddedKafka.start()
EmbeddedKafka.createCustomTopic("topic1", new Properties())
EmbeddedKafka.createCustomTopic("topic2", new Properties())
}
override def afterAll(): Unit = try super.afterAll() finally {
EmbeddedKafka.stop()
}
"publish events from topic to topic" in {
val start = System.currentTimeMillis()
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer, Set(topic))
.withBootstrapServers("localhost:9092")
.withClientId(clientId)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Source(1 to 1000)
.map( msg => new ProducerRecord[Array[Byte], String]("topic1", msg.toString) )
.map { x => println("producing-topic1"); x }
.runWith(Producer.plainSink(producerSettings))
// publish events from topic1 to topic2
Consumer.committableSource(consumerSettings("topic1", "client1"))
.map( msg => Producer.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.value), msg.committableOffset) )
.map { x => println("piping"); x }
.to(Producer.commitableSink(producerSettings)).run()
val sf =
Consumer.atMostOnceSource(consumerSettings("topic2", "client2"))
.take(1000)
.map { x => println("consuming-topic2"); x }
.runWith(Sink.seq)
whenReady(sf) { result =>
println("took" + (System.currentTimeMillis() - start))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment