This python program goes thru the files in given directory and parse all log files and filter unwanted data. Once data is filtered, The script uses pandas
library to do analytics. It prints total number of unique users and top 10 sessions with shortest
logest sessions along with page hits per user.
- Python 3.7.4
- Pip Library: pandas 0.24.2
> python3 sessions.py
Unique Sessions: 30
Top 10 Sessions by User:
user_id min max hits
0 71f28176 860.583333 860.583333 8835
1 489f3e87 860.416667 860.416667 11621
2 eaefd399 860.383333 860.383333 4312
3 95c2fa37 860.350000 860.350000 4732
4 34a3f7b7 856.000000 856.000000 1907
5 43a81873 0.000000 409.050000 3926
6 b3a60c78 1.200000 245.333333 408
7 be1b1a35 0.000000 221.083333 262
8 1ee6120e 0.000000 132.150000 167
9 d1f3e968 0.000000 121.616667 223
- Use generator expressions to do lazy computing
- Apple map/filter/aggregate strategy instead of single slurp
- Write test files for parse logic and anayltics logic
#!/usr/env python3
import re
import sys
import collections
from dateutil.parser import parse
import pandas as pd
import fileinput
import os
LOG_FILES_PATH = "./opt/logs"
SESSION_TIME_DELTA = 10 # Minutes
def log_parse():
'Return list of dictionaries containing required log filtered_data.'
# IP, timestamp, URI, Status
regex = r'([(\d\.)]+) - - (.*?\s-.*?) "(.*?)" (\d+) -'
entries = []
filtered_data = ''
data_files = (LOG_FILES_PATH + "/" + f for f in os.listdir(LOG_FILES_PATH))
try:
with fileinput.input(files=data_files) as f:
for line in f:
matches = re.match(regex, line).groups()
entries.append({
'client_ip' : matches[0],
'time' : matches[1],
'uri' : matches[2],
'status_code' : matches[3]
})
for elem in entries:
sess_path = elem['uri'].split()[1]
# Ignore uri which has missing user ID
if sess_path == '/' or '?' in sess_path or sess_path.count('/') == 2:
continue
user_id = sess_path.split('/', 4)[3]
logtime = elem['time']
tstamp = parse(logtime[:11] + " " + logtime[12:])
filtered_data = filtered_data + \
f"{elem['client_ip']},{tstamp},{user_id}, {elem['status_code']}\n"
except IndexError as err:
print("error: {0}".format(err))
except Exception as exp:
print(exp)
# Analyse
log_analyse(filtered_data)
def log_analyse(filtered_data):
try:
df = pd.read_csv(pd.compat.StringIO(filtered_data), names=[
'client_ip', 'timestamp', 'user_id', 'status_code'], parse_dates=[1])
# page hits
page_hits = df.groupby(['user_id']).size().reset_index(name='hits')
# session counts
df.sort_values(by=['user_id', 'timestamp'], inplace=True)
# 10min sessions rollowing window.
cond1 = df.timestamp - \
df.timestamp.shift(1) > pd.Timedelta(SESSION_TIME_DELTA, 'm')
cond2 = df.user_id != df.user_id.shift(1)
df['session_id'] = (cond1 | cond2).cumsum()
# min and max rolling sessions by user_id
session_length = df.groupby(['user_id', 'session_id'])[
'timestamp'].agg(['min', 'max']).reset_index()
session_length['mins'] = (
session_length['max'] - session_length['min']) / pd.Timedelta(minutes=1)
sessions = session_length.drop(['session_id', 'min', 'max'], axis=1)
sessions = sessions.groupby(['user_id'])['mins'].agg(
['min', 'max']).sort_values(['max'], ascending=False)
print(f"Unique Sessions: {len(sessions.index)}")
print(f"Top 10 Sessions by User:")
result = sessions.merge(page_hits, on=('user_id'))
print(result.head(10))
except Exception as exp:
print(exp)
if __name__ == "__main__":
# log_parse()
log_parse()