Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created September 10, 2019 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/58b3712a1d247a9575772b942e3d5ff3 to your computer and use it in GitHub Desktop.
Save ottomata/58b3712a1d247a9575772b942e3d5ff3 to your computer and use it in GitHub Desktop.
Spark Streaming SQL demo with netflow
# From stat1004:
# pyspark2 --jars ~otto/spark-sql-kafka-0-10_2.11-2.3.1.jar,~otto/kafka-clients-1.1.0.jar
# Need spark-sql-kafka for DataStream source and kafka-clients for Kafka serdes.
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Declare a Spark schema that matches the JSONData.
# In a future MEP world this would be automatically loaded
# from a JSONSchema.
netflow_schema = StructType() \
.add("event_type", "string") \
.add("tag2", "integer") \
.add("as_src", "integer") \
.add("as_dst", "integer") \
.add("as_path", "string") \
.add("peer_as_src", "integer") \
.add("peer_as_dst", "string") \
.add("ip_src", "string") \
.add("ip_dst", "string") \
.add("port_src", "integer") \
.add("port_dst", "integer") \
.add("tcp_flags", "string") \
.add("ip_proto", "string") \
.add("stamp_inserted", "timestamp") \
.add("stamp_updated", "timestamp") \
.add("packets", "integer") \
.add("bytes", "integer") \
.add("writer_id", "string")
# Create a streaming DataFrame from the netflow topic in Kafka.
netflow_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092") \
.option("subscribe", "netflow") \
.load()
# The JSON data is in the Kafka message value.
# We need to read it as json (using from_json function) using our schema,
# and then select the subfields into a top level DataFrame
netflow = netflow_stream \
.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", netflow_schema).alias("data")).select("data.*")
# Query the DataFrame using Spark SQL API.
# This will output the top 20 counts seen for (configurable) ip_src,
# updated every 30 seconds.
# NOTE: .format("console") writes to the console; this could
# be writing back to another Kafka topic, or a file, or elsewhere.
ip_src_counts = netflow \
.withWatermark("stamp_updated", "30 seconds") \
.groupBy("ip_src").count() \
.orderBy("count", ascending=False) \
.writeStream \
.trigger(processingTime="30 seconds") \
.outputMode("complete") \
.option("truncate", False) \
.format("console") \
.start()
# -------------------------------------------
# Batch: 2
# -------------------------------------------
# +--------------+-----+
# |ip_src |count|
# +--------------+-----+
# |X.X.X.X |8809 |
# |X.X.X.X |8295 |
# |X.X.X.X |743 |
# ...
# +--------------+-----+
# only showing top 20 rows
# To stop the streaming query:
ip_src_counts.stop()
# See also:
# https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
# https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html
# https://spark.apache.org/docs/2.3.1/structured-streaming-programming-guide.html
# https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html
#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment