Skip to content

Instantly share code, notes, and snippets.

@AlessandroStaffolani
Last active February 15, 2019 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlessandroStaffolani/376f03e5258a13823ff10f658f6d5e9d to your computer and use it in GitHub Desktop.
Save AlessandroStaffolani/376f03e5258a13823ff10f658f6d5e9d to your computer and use it in GitHub Desktop.
Kafka Scala producer consumer test
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
/**
* Consumer object
* @param host String with host or list of hosts separated by comma
* @param topicValue String with the topic value
* @param group String with the group id value
*/
class Consumer(host: String, topicValue: String, group: String) {
private var _topic = topicValue
private val _props = new Properties()
private var _stopConsuming = false
_props.put("bootstrap.servers", host)
_props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
_props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
_props.put("group.id", group)
private val consumer = new KafkaConsumer[String, String](_props)
consumer.subscribe(Collections.singletonList(_topic))
/**
* Getter for topic value
* @return String containg the topic value
*/
def topic: String = _topic
/**
* Setter for the topic value
* @param value String containg the new topic value
*/
def topic_=(value: String): Unit = {
_topic = value
consumer.subscribe(Collections.singletonList(_topic))
}
/**
* Getter of variable to stop consuming messages
* @return Boolean
*/
def stopConsuming: Boolean = _stopConsuming
/**
* Setter of stop consuming variable
* @param value Boolean
*/
def stopConsuming_=(value: Boolean): Unit = _stopConsuming = value
/**
* Consume messages until stopConsuming is not true
*/
def consumeMessages(): Unit = {
while (!_stopConsuming) {
val records=consumer.poll(100)
for (record<-records.asScala){
println(record)
}
}
}
}
object Main extends App{
val host = "localhost:9092"
val topic = "topic"
//val producer = new Producer(host, topic)
val consumer = new Consumer(host, topic, "group-id")
/*for (i <- 0 to 10) {
var message = "Message (" + i + ")"
producer.sendRecord( message)
println("Message sent (" + message + ")")
}*/
consumer.consumeMessages()
}
import java.util.Properties
import org.apache.kafka.clients.producer._
/**
* Producer object for kafka
* @param host String with host or list of hosts separated by comma
* @param topic_value String with the topic value
*/
class Producer(host: String, topic_value: String) {
private var _topic = topic_value
private val props = new Properties()
props.put("bootstrap.servers", host)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](props)
/**
* Getter for topic value
* @return String containg the topic value
*/
def topic: String = _topic
/**
* Setter for the topic value
* @param value String containg the new topic value
*/
def topic_=(value: String): Unit = _topic = value
/**
* Send a new record to topic of the object
* @param content String with the record content
*/
def sendRecord(content: String): Unit = {
val record = new ProducerRecord[String, String](_topic, content)
producer.send(record)
}
/**
* Close the producer
*/
def close(): Unit = {
producer.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment