Skip to content

Instantly share code, notes, and snippets.

@neduma
Last active October 29, 2019 16:38
Show Gist options
  • Save neduma/9ec4d1f6522a768f72e6d67bfcb9b2a3 to your computer and use it in GitHub Desktop.
Save neduma/9ec4d1f6522a768f72e6d67bfcb9b2a3 to your computer and use it in GitHub Desktop.
Time Series Analytics on Session Log Data

Summary

This python program goes thru the files in given directory and parse all log files and filter unwanted data. Once data is filtered, The script uses pandas library to do analytics. It prints total number of unique users and top 10 sessions with shortest logest sessions along with page hits per user.

Runtime Requirements

- Python 3.7.4
- Pip Library: pandas 0.24.2

Output

> python3 sessions.py 

Unique Sessions: 30
Top 10 Sessions by User:
    user_id         min         max   hits
0  71f28176  860.583333  860.583333   8835
1  489f3e87  860.416667  860.416667  11621
2  eaefd399  860.383333  860.383333   4312
3  95c2fa37  860.350000  860.350000   4732
4  34a3f7b7  856.000000  856.000000   1907
5  43a81873    0.000000  409.050000   3926
6  b3a60c78    1.200000  245.333333    408
7  be1b1a35    0.000000  221.083333    262
8  1ee6120e    0.000000  132.150000    167
9  d1f3e968    0.000000  121.616667    223

TODO Notes

  • Use generator expressions to do lazy computing
  • Apple map/filter/aggregate strategy instead of single slurp
  • Write test files for parse logic and anayltics logic

Program

#!/usr/env python3

import re
import sys
import collections
from dateutil.parser import parse
import pandas as pd
import fileinput
import os

LOG_FILES_PATH = "./opt/logs"
SESSION_TIME_DELTA = 10 # Minutes

def log_parse():
    'Return list of dictionaries containing required log filtered_data.'
    # IP, timestamp, URI, Status
    regex = r'([(\d\.)]+) - - (.*?\s-.*?) "(.*?)" (\d+) -'
    entries = []
    filtered_data = ''
    data_files = (LOG_FILES_PATH + "/" + f for f in os.listdir(LOG_FILES_PATH))
    try:
        with fileinput.input(files=data_files) as f:
            for line in f:
                matches = re.match(regex, line).groups()
                entries.append({
                    'client_ip'		: matches[0],
                    'time'		    : matches[1],
                    'uri'		    : matches[2],
                    'status_code'	: matches[3]
                })

        for elem in entries:
            sess_path = elem['uri'].split()[1]
            # Ignore uri which has missing user ID
            if sess_path == '/' or '?' in sess_path or sess_path.count('/') == 2:
                continue
            user_id = sess_path.split('/', 4)[3]
            logtime = elem['time']
            tstamp = parse(logtime[:11] + " " + logtime[12:])
            filtered_data = filtered_data + \
                f"{elem['client_ip']},{tstamp},{user_id}, {elem['status_code']}\n"

    except IndexError as err:
        print("error: {0}".format(err))
    except Exception as exp:
        print(exp)

    # Analyse
    log_analyse(filtered_data)


def log_analyse(filtered_data):
    try:
        df = pd.read_csv(pd.compat.StringIO(filtered_data), names=[
                         'client_ip', 'timestamp', 'user_id', 'status_code'], parse_dates=[1])

        # page hits
        page_hits = df.groupby(['user_id']).size().reset_index(name='hits')

        # session counts
        df.sort_values(by=['user_id', 'timestamp'], inplace=True)
        # 10min sessions rollowing window.
        cond1 = df.timestamp - \
            df.timestamp.shift(1) > pd.Timedelta(SESSION_TIME_DELTA, 'm')
        cond2 = df.user_id != df.user_id.shift(1)
        df['session_id'] = (cond1 | cond2).cumsum()

        # min and max rolling sessions by user_id
        session_length = df.groupby(['user_id', 'session_id'])[
            'timestamp'].agg(['min', 'max']).reset_index()
        session_length['mins'] = (
            session_length['max'] - session_length['min']) / pd.Timedelta(minutes=1)

        sessions = session_length.drop(['session_id', 'min', 'max'], axis=1)
        sessions = sessions.groupby(['user_id'])['mins'].agg(
            ['min', 'max']).sort_values(['max'], ascending=False)

        print(f"Unique Sessions: {len(sessions.index)}")
        print(f"Top 10 Sessions by User:")
        result = sessions.merge(page_hits, on=('user_id'))
        print(result.head(10))

    except Exception as exp:
        print(exp)


if __name__ == "__main__":
    # log_parse()
    log_parse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment