Skip to content

Instantly share code, notes, and snippets.

@marcotc
Created February 12, 2016 21:44
Show Gist options
  • Save marcotc/5123d0d9fe6fad6dea6a to your computer and use it in GitHub Desktop.
Save marcotc/5123d0d9fe6fad6dea6a to your computer and use it in GitHub Desktop.
#AUTHOR: MARK KHAITMAN
from datetime import datetime, timedelta
from urlparse import urlparse
from csv import reader
fields = ('timestamp','elb','ip','backend:port','req_pt','bpt',
'res_pt','elb_status','backend_status','rec_bytes','sent_bytes',
'req','ua','ssl_cipher','ssl_protocol')
INACTIVITY_WINDOW = 60
def mapInput(iterator):
'''Attaching metadata to the log lines so it's cleaner to work with below'''
r = reader(iterator, delimiter=' ', quotechar='\"')
for row in r:
yield dict(zip(fields, row))
def getSessions(iterator):
''' Key = time, value = URL
This function is meant for mapValues, where the iterator consists of
an iterable of time + URL pairs for an IP address RDD key.
We'll iterate through it, tracking the minTime, maxTime, and previous records' maxTime
as well as a set of distinct URLs visited during the session.
However, if the time elapsed between 2 consecutive page requests is greater than the INACTIVITY_WINDOW (minutes),
We end the previous session and yield back its session length and list of unique pages visited during that session
We then carry on, reset those params and assume a brand new session. This goes on until we've iterated through all page hits for that IP address
'''
minTime = maxTime = lastTime = None
uniquePages = set()
for k,v in iterator:
if lastTime and float(abs((k - lastTime)).seconds)/60 >= INACTIVITY_WINDOW:
yield (round(float((maxTime - minTime).seconds)/60, 6), list(uniquePages))
minTime = maxTime = lastTime = None
uniquePages = set()
if minTime == None:
minTime = maxTime = k
if k < minTime:
minTime = k
elif k > maxTime:
maxTime = k
uniquePages.add(v)
lastTime = k
yield (round(float((maxTime - minTime).seconds)/60,6), list(uniquePages))
# Here is where I create my RDD and point to the log location. For simplicity, I placed it in HDFS in the /tmp directory
# I'm running a dual-core machine, so with HT, I can parallelize the input into 4 and make use of all 4 cores.
# I put each of the 4 partitions through the mapInput function defined above, to attach metadata for the purpose of making my life easier :)
# I then map each record and only keep the client IP address, the datetime parsed from the timestamp string, as well as the path from the parsed URL
# since those are the only fields I care about
# I then SortByKey which sorts by both IP address and timestamps, groupBy the IP address to give me an iterable list of timestamps and URLs by IP address
# I'm then ready to perform the flatMapValues function (getSesssions) in order to sessionize and gather the required statistics
# Finally, I sort the result by descending order of session time, giving me the most engaged users from the beginning of my result to the end
rdd = sc.textFile("hdfs://sandbox.hortonworks.com/tmp/2015_07_22_mktplace_shop_web_log_sample.log",4) \
.mapPartitions(mapInput) \
.map(lambda l: (l.get('ip').split(':')[0], (datetime.strptime(l.get('timestamp'),"%Y-%m-%dT%H:%M:%S.%fZ"), urlparse(l.get('req')).path.split(' ')[1]))) \
.cache()
res = rdd.sortByKey().groupByKey().flatMapValues(getSessions).sortBy(lambda x: x[1][0], ascending=False).collect()
# 100 Most Engaged Users (IP and session length): (Defined by time window)
for x in res[:100]:
x[0], x[1][0]
# Average session time
sessionCount = 0
total = 0
for x in res:
sessionCount += 1
total += x[1][0]
print 'Average Session Time: %s Minutes' % (total / sessionCount)
#Average Session Time: 1.34913467626 Minutes for 15 minute inactivity window
#Average Session Time: 2.48392207583 Minutes for 30 minute inactivity window
#Average Session Time: 2.65802297988 Minutes for 60 minute inactivity window
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment