Skip to content

Instantly share code, notes, and snippets.

@jlafall
Last active January 24, 2018 16:28
Show Gist options
  • Save jlafall/e2c0d88428ebf7d9e6cc6b53818b570d to your computer and use it in GitHub Desktop.
Save jlafall/e2c0d88428ebf7d9e6cc6b53818b570d to your computer and use it in GitHub Desktop.
Apache Spark Structured Streaming word count using Kafka as the source and Couchbase and the sink
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object WordCountKafkaCouchbase {
def main(args: Array[String]) {
// create spark session with settings
val spark = SparkSession
.builder
.appName("Word Count Test")
.config("spark.couchbase.username", "[username goes here]")
.config("spark.couchbase.password", "[password goes here]")
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.bucket.words", "")
.getOrCreate()
import spark.implicits._
// create kafka streaming data frame
val kafkaDS = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "words")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val wordDF = kafkaDS.flatMap {
case (_, value) => value.split("""\s+""")
}
.toDF("word")
val wordCountDF = wordDF.groupBy("word")
.agg(count("word").as("count"))
val query = wordCountDF.writeStream
.format("com.couchbase.spark.sql")
.outputMode("update")
.option("checkpointLocation", "/home/vagrant/checkpoint")
.option("idField", "word")
.start()
.awaitTermination()
}
}
import java.time.LocalDateTime
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object WordWithDateTimeKafkaCouchbase {
def main(args: Array[String]) {
// create spark session with settings
val spark = SparkSession
.builder
.appName("Word Count Test")
.config("spark.couchbase.username", "[username goes here]")
.config("spark.couchbase.password", "[password goes here]")
.config("spark.couchbase.nodes", "localhost")
.config("spark.couchbase.bucket.words", "")
.getOrCreate()
import spark.implicits._
// create kafka streaming data frame
val kafkaDS = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "words")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val wordDF = kafkaDS.flatMap {
case (_, value) => value.split("""\s+""").map(word => (word, LocalDateTime.now().toString))
}
.toDF("word", "date")
val query = wordDF.writeStream
.format("com.couchbase.spark.sql")
.outputMode("update")
.option("checkpointLocation", "/home/vagrant/checkpoint")
.option("idField", "word")
.start()
.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment