Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
#!/usr/bin/env python
# coding: utf-8
# In[1]:
get_ipython().system('pip install --upgrade git+')
# In[2]:
import wmfdata
from pyspark.sql.functions import *
from pyspark.sql.types import _parse_datatype_json_string
# In[4]:
def load_spark_schema(schema_url, spark_session=None):
Given a full URL to a JSONSchema (event schema), this will use refinery's
JsonSchemaConverter to load and parse the JSONSchema into a Spark Schema and return it.
if spark_session == None:
spark_session = get_spark_session()
sc = spark_session.sparkContext
JsonSchemaLoader =
JsonSchemaConverter =
uri_j = URI(schema_url)
jsonschema_j = JsonSchemaLoader.getInstance().load(uri_j)
sparkschema_j = JsonSchemaConverter.toSparkSchema(jsonschema_j)
return _parse_datatype_json_string(sparkschema_j.json())
# In[6]:
spark_extra_config = {
'spark.jars': '/home/otto/kafka-clients-1.1.0.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/otto/refinery-job-0.0.112.jar'
spark = wmfdata.spark.get_session(
master='local', app_name='PySpark Streaming Demo', spark_config=spark_extra_config
# In[7]:
# We'll be consuming the revision-create stream, so we'll use the mediawiki/revision/create event schema
schema_url = ''
# Load and convert the JSONSchema to a Spark Schema
spark_schema = load_spark_schema(schema_url, spark)
# In[9]:
# Create a Spark data stream connected to Kafka's eqiad.mediawiki.revision-create topic.
# This stream comes to us as schemaless key, value pairs.
kafka_stream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092") .option("subscribe", "eqiad.mediawiki.revision-create") .load()
# In[10]:
# The JSON data is in the Kafka message value.
# We need to read it as json (using from_json function) using our schema,
# and then select the subfields into a top level DataFrame
revision_create_stream = kafka_stream .selectExpr("CAST(value AS STRING)") .select(from_json("value", spark_schema).alias("data")).select("data.*")
# In[12]:
# Let's check that our streaming DataFrame has the expected schema.
# In[13]:
# We are going to filter the edit stream for pages that have these keywords in the title,
# and then count group by page title and count desc
page_title_keywords = [
'Draft', 'film', 'Talk', 'User'
page_title_pattern = '|'.join(page_title_keywords)
# In[27]:
# Query the DataFrame using Spark SQL API.
# This will output the top 20 counts seen for (configurable) ip_src,
# updated every 30 seconds.
# NOTE: .format("console") writes to the console; this could
# be writing back to another Kafka topic, or a file, or elsewhere.
page_title_counts = revision_create_stream .where(revision_create_stream.page_title.rlike(page_title_pattern)) .select("page_title") .groupBy("page_title").count() .orderBy("count", ascending=False) .writeStream .format("memory") .queryName("page_title_counts") .trigger(processingTime="30 seconds") .outputMode("complete") .option("truncate", False) .start()
# In[25]:
# In[42]:
spark.sql("select * from page_title_counts").toPandas().iloc[:10]
# In[32]:
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment