Skip to content

Instantly share code, notes, and snippets.

@koljamaier
Created October 28, 2016 07:32
Show Gist options
  • Save koljamaier/032717cb327103e093bd277f3a43a74e to your computer and use it in GitHub Desktop.
Save koljamaier/032717cb327103e093bd277f3a43a74e to your computer and use it in GitHub Desktop.
Spark DataFrame example
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import json
import re
api_pattern = re.compile(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*')
def matcher( str ):
match = api_pattern.search(str)
if match:
res = match.group().replace('"','')
words = res.split()
key = " ".join(words[0:len(words)-1])
return key
else:
return
conf = SparkConf().setAppName("dataframeTest").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
distFile = sc.textFile("logfile.txt")
# Load a text file and convert & parse each line to a tuple (RDD)
api_requests_rdd = distFile.map(lambda line: (matcher(json.loads(line)["body"]),1)).filter(lambda (x, y): x is not None).reduceByKey(lambda a, b: a + b)
schemaCols = "api_endpoint request_count"
# Specify the fields of the table. The parameter "True" indicates, that null-values are allowed
fields = [StructField(field_name, StringType(), True) for field_name in schemaCols.split()]
schema = StructType(fields)
# Apply the schema to the RDD and by that create a DataFrame(!). Cache it because it will be used frequently
api_request_dataframe = sqlContext.createDataFrame(api_requests_rdd, schema).cache()
# Example queries/operations from the DataFrame API
api_request_dataframe.show()
api_request_dataframe.printSchema()
api_request_dataframe.filter(api_request_dataframe["request_count"]>10).show()
api_request_dataframe.select(api_request_dataframe["api_endpoint"]).show()
# Register the DataFrame as a table.
api_request_dataframe.registerTempTable("api_requests")
# SQL can be run over DataFrames that have been registered as a table. This returns a DataFrame!
results = sqlContext.sql("SELECT api_endpoint FROM api_requests")
# The results of SQL queries are RDDs and support all the normal RDD operations.
api_endpoints = results.map(lambda ep: "Api endpoint: " + ep["api_endpoint"])
for ep in api_endpoints.collect():
print(ep)
Data Frames in Spark are a important concept. They allow us to enrich RDDs with structured information which can be beneficial in terms of performance.
Hence it is also possible to query data on the cluster in a SQL manner.
This example was created referring to http://spark.apache.org/docs/1.6.2/sql-programming-guide.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment