Skip to content

Instantly share code, notes, and snippets.

@heliocentrist
Last active November 24, 2016 09:35
Show Gist options
  • Save heliocentrist/fcaafa3115fe72dc9b5a7fc60221442b to your computer and use it in GitHub Desktop.
Save heliocentrist/fcaafa3115fe72dc9b5a7fc60221442b to your computer and use it in GitHub Desktop.
package ingestionservicemonix
import monix.kafka._
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
/**
* Created by yury.liavitski on 23/11/16.
*/
object IngestionServiceMonix extends App {
import monix.execution.Scheduler
val io = Scheduler.io("Kafka-scheduler")
// Initializing the producer
val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("node1:31000")
)
val producer = KafkaProducerSink[String,String](producerCfg, io)
// Lets pretend we have this observable of records
val observable: Observable[ProducerRecord[String,String]] = Observable
.interval(1.second)
.map(x => new ProducerRecord[String,String]("test", 0, null, "mymessage4"))
var done = observable
// on overflow, start dropping incoming events
.whileBusyDropEvents
// buffers into batches if the consumer is busy, up to a max size
.bufferIntrospective(1024)
// consume everything by pushing into Apache Kafka
.runWith(producer)
// ready, set, go!
.runAsync
Await.result(done, 120 seconds)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment