Created
January 31, 2020 01:02
-
-
Save ottomata/ec5cd742fc2d2e894126e31ddc34ebd3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
get_ipython().system('pip install --upgrade git+https://github.com/neilpquinn/wmfdata.git') | |
# In[2]: | |
import wmfdata | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import _parse_datatype_json_string | |
# In[4]: | |
def load_spark_schema(schema_url, spark_session=None): | |
""" | |
Given a full URL to a JSONSchema (event schema), this will use refinery's | |
JsonSchemaConverter to load and parse the JSONSchema into a Spark Schema and return it. | |
""" | |
if spark_session == None: | |
spark_session = get_spark_session() | |
sc = spark_session.sparkContext | |
JsonSchemaLoader = sc._jvm.org.wikimedia.analytics.refinery.core.jsonschema.JsonSchemaLoader | |
JsonSchemaConverter = sc._jvm.org.wikimedia.analytics.refinery.spark.sql.JsonSchemaConverter | |
URI = sc._jvm.java.net.URI | |
uri_j = URI(schema_url) | |
jsonschema_j = JsonSchemaLoader.getInstance().load(uri_j) | |
sparkschema_j = JsonSchemaConverter.toSparkSchema(jsonschema_j) | |
return _parse_datatype_json_string(sparkschema_j.json()) | |
# In[6]: | |
spark_extra_config = { | |
'spark.jars': '/home/otto/kafka-clients-1.1.0.jar,/home/otto/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/otto/refinery-job-0.0.112.jar' | |
} | |
spark = wmfdata.spark.get_session( | |
master='local', app_name='PySpark Streaming Demo', spark_config=spark_extra_config | |
) | |
# In[7]: | |
# We'll be consuming the revision-create stream, so we'll use the mediawiki/revision/create event schema | |
schema_url = 'https://schema.wikimedia.org/repositories/primary/jsonschema/mediawiki/revision/create/1.0.0.json' | |
# Load and convert the JSONSchema to a Spark Schema | |
spark_schema = load_spark_schema(schema_url, spark) | |
# In[9]: | |
# Create a Spark data stream connected to Kafka's eqiad.mediawiki.revision-create topic. | |
# This stream comes to us as schemaless key, value pairs. | |
kafka_stream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092") .option("subscribe", "eqiad.mediawiki.revision-create") .load() | |
# In[10]: | |
# The JSON data is in the Kafka message value. | |
# We need to read it as json (using from_json function) using our schema, | |
# and then select the subfields into a top level DataFrame | |
revision_create_stream = kafka_stream .selectExpr("CAST(value AS STRING)") .select(from_json("value", spark_schema).alias("data")).select("data.*") | |
# In[12]: | |
# Let's check that our streaming DataFrame has the expected schema. | |
revision_create_stream.printSchema() | |
# In[13]: | |
# We are going to filter the edit stream for pages that have these keywords in the title, | |
# and then count group by page title and count desc | |
page_title_keywords = [ | |
'Draft', 'film', 'Talk', 'User' | |
] | |
page_title_pattern = '|'.join(page_title_keywords) | |
# In[27]: | |
# Query the DataFrame using Spark SQL API. | |
# This will output the top 20 counts seen for (configurable) ip_src, | |
# updated every 30 seconds. | |
# NOTE: .format("console") writes to the console; this could | |
# be writing back to another Kafka topic, or a file, or elsewhere. | |
page_title_counts = revision_create_stream .where(revision_create_stream.page_title.rlike(page_title_pattern)) .select("page_title") .groupBy("page_title").count() .orderBy("count", ascending=False) .writeStream .format("memory") .queryName("page_title_counts") .trigger(processingTime="30 seconds") .outputMode("complete") .option("truncate", False) .start() | |
# In[25]: | |
# In[42]: | |
spark.sql("select * from page_title_counts").toPandas().iloc[:10].plot.bar() | |
# In[32]: | |
spark.streams.active | |
# In[ ]: | |
page_title_counts.stop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment