Last active
November 28, 2024 13:00
-
-
Save fancellu/f78e11b1808db2727d76 to your computer and use it in GitHub Desktop.
Kafka Producer/Consumer Example in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import scala.collection.JavaConverters._ | |
object ConsumerExample extends App { | |
import java.util.Properties | |
val TOPIC="test" | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("group.id", "something") | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(util.Collections.singletonList(TOPIC)) | |
while(true){ | |
val records=consumer.poll(100) | |
for (record<-records.asScala){ | |
println(record) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ProducerExample extends App { | |
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
val producer = new KafkaProducer[String, String](props) | |
val TOPIC="test" | |
for(i<- 1 to 50){ | |
val record = new ProducerRecord(TOPIC, "key", s"hello $i") | |
producer.send(record) | |
} | |
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date) | |
producer.send(record) | |
producer.close() | |
} |
I'm using eclipse IDE getting below error. Any idea what is the issue.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Hi, I am new to Kafka (using for only 3 days now) but I think
[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10
this means that you should use scala version 2.11 instead of 2.10
correct me if I am wrong...
A full kafkasteams example here
https://github.com/fancellu/kafkastreams
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
"org.apache.kafka" % "kafka_2.11" % "1.1.1" this worked fine for me.
name := "myApp"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-sql_2.11" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.kafka" % "kafka_2.11" % "1.1.1"
)