Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active November 3, 2021 13:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ottomata/c6411c9872e80bce4c4c33ed6bee9b42 to your computer and use it in GitHub Desktop.
Save ottomata/c6411c9872e80bce4c4c33ed6bee9b42 to your computer and use it in GitHub Desktop.
Spark Structured Streaming example - word count in JSON field in Kafka
// spark2-shell --jars /home/otto/kafka-clients-1.1.1.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.3.1.jar
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
// Subscribe to eventlogging-valid-mixed using Spark structured streaming
val eventlogging_valid_mixed_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092,kafka-jumbo1002.eqiad.wmnet:9092")
.option("subscribe", "eventlogging-valid-mixed")
.load()
// Since eventlogging-valid-mixed has multiple schemas in it, we can't
// really use structured streaming (every message must have a well defined schema).
// Instead, we create a custom schema here that includes only the 'schema' field
// from EventCapsule. We apply this schema when reading JSON using the from_json
// sql function, dropping every field in the data except for 'schema' name.
val sparkSchema = StructType(Seq(StructField("schema", StringType, true)))
// Kafka messages are key, value. Cast value into a string, then read it as
// json, then apply the 'schema' name only sparkSchema, and select out of
// the value the 'schema' name field. schemas will be a streaming DataFrame
// with a single 'schema' field.
val schemas = eventlogging_valid_mixed_stream.select(from_json($"value".cast("string"), sparkSchema).alias("value")).select("value.schema")
// Count by schema name.
val schemaCounts = schemas.groupBy("schema").count()
// Before we start the streaming query, we will add a StreamingQueryListener
// callback that will be executed every time the micro batch completes.
val streamListener = new StreamingQueryListener() {
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
//println("Query made progress: " + queryProgress.progress)
// Query the in memory table for the current schema counts
val currentDf = spark.sql("select * from schema_counts order by count desc")
// group them together into a single spark partition (this should be small anyway)
// and overwrite them into a hive table.
currentDf.repartition(1).write.mode("overwrite").saveAsTable("otto.eventlogging_valid_mixed_schema_counts")
// also print out the stop results because why not! It's fun.
currentDf.show()
}
// We don't want to do anything with start or termination,
// but we have to override them anyway'
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
}
// add the new listener callback
spark.streams.addListener(streamListener)
// Start a streaming query that saves its results in an in memory table
// named 'schema_counts'. 'schema_counts' can now be queried for up-to-date
// results at any time.
val streamingQuery = schemaCounts
.writeStream
.queryName("schema_counts")
.outputMode("complete")
.format("memory")
.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment