Create a gist now

Instantly share code, notes, and snippets.

Embed
Kafka Producer/Consumer Example in Scala
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object ConsumerExample extends App {
import java.util.Properties
val TOPIC="test"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "something")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true){
val records=consumer.poll(100)
for (record<-records.asScala){
println(record)
}
}
}
object ProducerExample extends App {
import java.util.Properties
import org.apache.kafka.clients.producer._
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC="test"
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i")
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close()
}
@ChandrasekaranT

This comment has been minimized.

Show comment
Hide comment
@ChandrasekaranT

ChandrasekaranT Nov 3, 2017

Hi
I tried using kafka_2.11-0.11.0.1 and scalaVersion := "2.10.4" when I do sbt package from the command prompt I getting the error
Below is the build.sbt content
name := "Analyze Logs"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.2.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.5.2"

Error message:-

[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10

Could any one help me how to resolve the above error and what need to update in the sbt file?
regards
T.Chandrasekaran

Hi
I tried using kafka_2.11-0.11.0.1 and scalaVersion := "2.10.4" when I do sbt package from the command prompt I getting the error
Below is the build.sbt content
name := "Analyze Logs"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.2.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.5.2"

Error message:-

[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10

Could any one help me how to resolve the above error and what need to update in the sbt file?
regards
T.Chandrasekaran

@mdaafaq

This comment has been minimized.

Show comment
Hide comment
@mdaafaq

mdaafaq Dec 7, 2017

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-8" % "(your spark version)" % "provided"

mdaafaq commented Dec 7, 2017

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-8" % "(your spark version)" % "provided"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment