Skip to content

Instantly share code, notes, and snippets.

@koushikmln
Last active May 20, 2018 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koushikmln/33b92f9043b18b2a2bf6087b52f82079 to your computer and use it in GitHub Desktop.
Save koushikmln/33b92f9043b18b2a2bf6087b52f82079 to your computer and use it in GitHub Desktop.
Retail Log Producer Using Kafka
import java.util.{Properties}
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import scala.sys.process._
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "rm01.itversity.com:6667,nn02.itversity.com:6667,nn01.itversity.com:6667")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ScalaProducerExample")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
def processLine(line: String, producer: KafkaProducer[String, String]): Unit = {
val data = new ProducerRecord[String, String]("Kafka-Testing", "Key", line)
producer.send(data)
}
val file = "/opt/gen_logs/logs/access.log"
val tail = Seq("tail", "-f", file)
tail.lineStream.foreach(processLine(_, producer))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment