Kafka Producer/Consumer Example in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import scala.collection.JavaConverters._ | |
object ConsumerExample extends App { | |
import java.util.Properties | |
val TOPIC="test" | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("group.id", "something") | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(util.Collections.singletonList(TOPIC)) | |
while(true){ | |
val records=consumer.poll(100) | |
for (record<-records.asScala){ | |
println(record) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ProducerExample extends App { | |
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
val producer = new KafkaProducer[String, String](props) | |
val TOPIC="test" | |
for(i<- 1 to 50){ | |
val record = new ProducerRecord(TOPIC, "key", s"hello $i") | |
producer.send(record) | |
} | |
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date) | |
producer.send(record) | |
producer.close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, I am new to Kafka (using for only 3 days now) but I think
[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10
this means that you should use scala version 2.11 instead of 2.10
correct me if I am wrong...