Skip to content

Instantly share code, notes, and snippets.

@jfthuong
Last active October 17, 2018 14:48
Show Gist options
  • Save jfthuong/dc40cd98fb9cf796de87800b94a16799 to your computer and use it in GitHub Desktop.
Save jfthuong/dc40cd98fb9cf796de87800b94a16799 to your computer and use it in GitHub Desktop.
WPE - Week 04 - Solution
from collections import defaultdict, namedtuple
from datetime import datetime
import pandas as pd # type: ignore # "No library stub" for pandas
import re
import sys
from timeit import timeit
from typing import Any, Callable, Dict, Iterator, List, Tuple
DictLog = Dict[str, str]
ListLogs = List[DictLog]
SortFunc = Callable[[DictLog], Any]
IterDictLog = Iterator[DictLog]
TDataFrame = Any # TODO: why can't we make alias to pd.core.frame.DataFrame?
class LogDicts_KOALA:
format_timestamp = r"%d/%b/%Y:%H:%M:%S %z"
def __init__(self, logfilename: str) -> None:
# We will record the logs by timestamps for getting first and last more easily
def get_date(log: DictLog):
return datetime.strptime(log["timestamp"], self.format_timestamp)
try:
with open(logfilename) as file:
self.__logs = sorted(
[self._parse_line(line) for line in file if "." in line],
key=get_date,
)
except Exception as e:
print(f"Error while trying to read {logfilename}: {e}")
raise
def _parse_line(self, line: str) -> DictLog:
"""Return a dictionary of information from a line in log file"""
PATTERN_LOG = r"""
(?P<ip_address>[\d\.]+)[\-\s]+
\[(?P<timestamp>.*?)\]\s*
"(?P<request>.*?(?<!\\))"
"""
match_line = re.match(PATTERN_LOG, line, flags=re.X)
if match_line:
return match_line.groupdict()
else:
raise Exception(f"No match for line {line!r}")
def dicts(self, key: SortFunc = None) -> ListLogs:
"""Returns list of dictionaries, possibly sorted"""
if key:
return sorted(self.__logs, key=key)
else:
return self.__logs
def iterdicts(self, key: SortFunc = None) -> IterDictLog:
"""Returns an iterator with list of dictionaries, possibly sorted"""
yield from self.dicts(key=key)
def earliest(self) -> DictLog:
"""Return the dict with the earliest timestamp"""
return self.__logs[0]
def latest(self) -> DictLog:
"""Return the dict with the latest timestamp"""
return self.__logs[-1]
def for_ip(self, ip_address: str, key: SortFunc = None) -> ListLogs:
"""Return all records for a particular IP address"""
return [log for log in self.dicts(key) if log["ip_address"] == ip_address]
def for_request(self, text: str, key: SortFunc = None) -> ListLogs:
"""Return all records for a particular IP address"""
return [log for log in self.dicts(key) if text in log["request"]]
class LogDicts_PANDA(pd.DataFrame):
format_timestamp = r"%d/%b/%Y:%H:%M:%S %z"
def __init__(self, logfilename: str) -> None:
# We will record the logs by timestamps for getting first and last more easily
def get_date(log: DictLog):
return datetime.strptime(log["timestamp"], self.format_timestamp)
try:
with open(logfilename) as file:
super().__init__(
sorted(
[self._parse_line(line) for line in file if "." in line],
key=get_date,
)
)
except Exception as e:
print(f"Error while trying to read {logfilename}: {e}")
raise
def _parse_line(self, line: str) -> DictLog:
"""Return a dictionary of information from a line in log file"""
PATTERN_LOG = r"""
(?P<ip_address>[\d\.]+)[\-\s]+
\[(?P<timestamp>.*?)\]\s*
"(?P<request>.*?(?<!\\))"
"""
match_line = re.match(PATTERN_LOG, line, flags=re.X)
if match_line:
return match_line.groupdict()
else:
return {}
def _get_elements(self, dataframe: TDataFrame, key: SortFunc = None) -> ListLogs:
"""Return a list of dict when we have a DataFrame of several elements"""
if key:
return sorted(dataframe.to_dict("records"), key=key)
else:
return dataframe.to_dict("records")
def dicts(self, key: SortFunc = None) -> ListLogs:
"""Returns list of dictionaries, possibly sorted"""
return self._get_elements(self, key=key)
def iterdicts(self, key: SortFunc = None) -> IterDictLog:
"""Returns an iterator with list of dictionaries, possibly sorted"""
# TODO: consider using self.iterrows() but tricky part is key sorting
yield from self.dicts(key=key)
def earliest(self) -> DictLog:
"""Return the dict with the earliest timestamp"""
return self.iloc[0].to_dict()
def latest(self) -> DictLog:
"""Return the dict with the latest timestamp"""
return self.iloc[-1].to_dict()
def for_ip(self, ip_address: str, key: SortFunc = None) -> ListLogs:
"""Return all records for a particular IP address"""
# return self._get_elements(self[self.ip_address == ip_address], key=key) # slower
return self._get_elements(self.query(f"ip_address == '{ip_address}'"), key=key)
def for_request(self, text: str, key: SortFunc = None) -> ListLogs:
"""Return all records for a particular IP address"""
return self._get_elements(self[self["request"].str.contains(text)], key=key)
# Select SOLUTION
LogDicts = LogDicts_KOALA
if __name__ == "__main__":
def by_ip_address(one_log_dict):
return [int(number) for number in one_log_dict["ip_address"].split(".")]
timing = defaultdict(dict) # type: Dict[str, Dict[str, float]]
animals = dict() # type: Dict[str, Any]
Numbers = namedtuple("Numbers", "mini big") # type: Tuple[int, int]
list_commands = {
"1. unsorted_dicts": "{0}.dicts()",
"1. unsorted_iter": "for _ in {0}.iterdicts(): pass",
"2. sorted_dicts": "{0}.dicts(key=by_ip_address)",
"2. sorted_iter": "for _ in {0}.iterdicts(key=by_ip_address): pass",
"3. earliest": "{0}.earliest()",
"3. latest": "{0}.latest()",
"4. for_request": "{0}.for_request('browse_applet_tab')",
"4. for_ip": "{0}.for_ip('65.55.106.186')",
}
for size in ["mini", "big"]:
for animal in ["koala", "panda"]:
name = f"{size} {animal}"
animal_class = f"LogDicts_{animal.upper()}"
init_animal = f"{animal_class}('{size}-access-log.txt')"
my_animal = f"animals['{name}']"
# Initialization of Class
nbs = Numbers(100, 10)
timing[name][f"0. init [{nbs.mini}|{nbs.big}]"] = timeit(
f"my_animal = {init_animal}",
setup=f"from __main__ import {animal_class}",
number=nbs.mini if size == "mini" else nbs.big,
)
# Measures of different commands
nbs = Numbers(1000, 100)
animals[name] = eval(init_animal)
for cmd_name, cmd_fn in list_commands.items():
cmd_name += f" [{nbs.mini}|{nbs.big}]"
timing[name][cmd_name] = timeit(
cmd_fn.format(my_animal),
setup=f"from __main__ import animals, by_ip_address",
number=nbs.mini if size == "mini" else nbs.big,
)
# We transform into a Dataframe for pretty results
print(pd.DataFrame(timing))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment