Skip to content

Instantly share code, notes, and snippets.

@koljamaier
Created October 28, 2016 07:25
Show Gist options
  • Save koljamaier/7b401aa85e46c01204d89840b1b3ac83 to your computer and use it in GitHub Desktop.
Save koljamaier/7b401aa85e46c01204d89840b1b3ac83 to your computer and use it in GitHub Desktop.
Spark cluster text processing vs. single node
From this little example created by myself I learned, that it is always important to think in the distributed MapReduce paradigm when getting hands on spark.
Operations that are costly calculations in a centralized manner should be avoided on the cluster.
import json
import re
def matcher( str ):
for match in re.finditer(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*', str):
if match:
res = match.group().replace('"','')
words = res.split()
key = " ".join(words[0:len(words)-1])
requests = int(words[len(words)-1])
print key
if key in api_requests:
api_requests[key][0] += requests
api_requests[key][1] += 1
else:
api_requests[key] = [requests]
api_requests[key].append(1)
else:
print "Nothing found"
return
with open("logfile.txt", "r") as file:
# new dictionary
api_requests = {}
for line in file:
data = json.loads(line)
elm = data["body"]
matcher(elm)
from pyspark import SparkContext, SparkConf
import json
import re
api_pattern = re.compile(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*')
def matcher( str ):
match = api_pattern.search(str)
if match:
res = match.group().replace('"','')
words = res.split()
key = " ".join(words[0:len(words)-1])
return key
else:
return
conf = SparkConf().setAppName("test").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("logfile.txt")
api_requests = distFile.map(lambda line: (matcher(json.loads(line)["body"]),1)).filter(lambda (x, y): x is not None).reduceByKey(lambda a, b: a + b)
list_test = api_requests.map(lambda (x, y): list((x,y)) )
for name in list_test.collect():
print(name[1])
api_requests.saveAsTextFile("api_requests")
# spark-submit C:\Users\koljacorneliusmaier\Documents\textprocessingCluster.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment