Last active
February 15, 2019 10:15
-
-
Save AlessandroStaffolani/376f03e5258a13823ff10f658f6d5e9d to your computer and use it in GitHub Desktop.
Kafka Scala producer consumer test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Collections, Properties} | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import scala.collection.JavaConverters._ | |
/** | |
* Consumer object | |
* @param host String with host or list of hosts separated by comma | |
* @param topicValue String with the topic value | |
* @param group String with the group id value | |
*/ | |
class Consumer(host: String, topicValue: String, group: String) { | |
private var _topic = topicValue | |
private val _props = new Properties() | |
private var _stopConsuming = false | |
_props.put("bootstrap.servers", host) | |
_props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
_props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
_props.put("group.id", group) | |
private val consumer = new KafkaConsumer[String, String](_props) | |
consumer.subscribe(Collections.singletonList(_topic)) | |
/** | |
* Getter for topic value | |
* @return String containg the topic value | |
*/ | |
def topic: String = _topic | |
/** | |
* Setter for the topic value | |
* @param value String containg the new topic value | |
*/ | |
def topic_=(value: String): Unit = { | |
_topic = value | |
consumer.subscribe(Collections.singletonList(_topic)) | |
} | |
/** | |
* Getter of variable to stop consuming messages | |
* @return Boolean | |
*/ | |
def stopConsuming: Boolean = _stopConsuming | |
/** | |
* Setter of stop consuming variable | |
* @param value Boolean | |
*/ | |
def stopConsuming_=(value: Boolean): Unit = _stopConsuming = value | |
/** | |
* Consume messages until stopConsuming is not true | |
*/ | |
def consumeMessages(): Unit = { | |
while (!_stopConsuming) { | |
val records=consumer.poll(100) | |
for (record<-records.asScala){ | |
println(record) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Main extends App{ | |
val host = "localhost:9092" | |
val topic = "topic" | |
//val producer = new Producer(host, topic) | |
val consumer = new Consumer(host, topic, "group-id") | |
/*for (i <- 0 to 10) { | |
var message = "Message (" + i + ")" | |
producer.sendRecord( message) | |
println("Message sent (" + message + ")") | |
}*/ | |
consumer.consumeMessages() | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
/** | |
* Producer object for kafka | |
* @param host String with host or list of hosts separated by comma | |
* @param topic_value String with the topic value | |
*/ | |
class Producer(host: String, topic_value: String) { | |
private var _topic = topic_value | |
private val props = new Properties() | |
props.put("bootstrap.servers", host) | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
private val producer = new KafkaProducer[String, String](props) | |
/** | |
* Getter for topic value | |
* @return String containg the topic value | |
*/ | |
def topic: String = _topic | |
/** | |
* Setter for the topic value | |
* @param value String containg the new topic value | |
*/ | |
def topic_=(value: String): Unit = _topic = value | |
/** | |
* Send a new record to topic of the object | |
* @param content String with the record content | |
*/ | |
def sendRecord(content: String): Unit = { | |
val record = new ProducerRecord[String, String](_topic, content) | |
producer.send(record) | |
} | |
/** | |
* Close the producer | |
*/ | |
def close(): Unit = { | |
producer.close() | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment