Skip to content

Instantly share code, notes, and snippets.

@kevinjqiu
Created March 8, 2016 18:13
Show Gist options
  • Save kevinjqiu/88c6d4b5a37595e5f841 to your computer and use it in GitHub Desktop.
Save kevinjqiu/88c6d4b5a37595e5f841 to your computer and use it in GitHub Desktop.
import abc
import json
import re
import arrow
def read_entries():
with open('buy.json') as f:
entries = map(json.loads, (line for line in f.readlines() if line))
return entries
def har(obj):
return obj.__har__()
class HARMixin():
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __har__(self):
"""Returns a dict representing the HAR entry"""
class HARLog(HARMixin):
version = '1.2'
def __init__(self, roundtrip_entries):
assert len(roundtrip_entries) > 0
self.entries = roundtrip_entries
self.pages = [self._get_page_from_entry(self.entries[0])]
def __har__(self):
log = {
'version': self.version,
'pages': self.pages,
'creator': {'name': 'xray', 'version': '0.1'},
'entries': list(map(har, self.entries)),
}
for entry in log['entries']:
entry['pageref'] = self.pages[0]['id']
return {'log': log}
def _get_page_from_entry(self, entry):
return {
'id': 'page1',
'startedDateTime': entry.start_time.isoformat(),
'title': entry.request_line,
'pageTimings': {
"onContentLoad": -1,
"onLoad": -1,
},
}
class LogEntry(HARMixin):
def __init__(self, raw, line):
self.raw = raw
self.time = self.raw.get('_time')
self.host = self.raw.get('host')
self.source = self.raw.get('source')
self.line = line
self.abbrev_host = self.host.split('.points.com')[0]
self.abbrev_source = self.source.split('/')[-1].split('.log')[0]
self.source_id = '{}:{}'.format(self.abbrev_host, self.abbrev_source)
self.host_class = self.abbrev_host[:-1]
# m = re.search('pid:(\d+)', self.raw.get('_raw'))
# if m:
# self.pid = m.group(1)
# else:
# self.pid = None
self.parse_headers_and_body()
def parse_headers_and_body(self):
self.headers, self.body = [], []
self.header_size, self.body_size = 0, 0
in_header = False
in_body = False
for line in self.raw['_raw'].splitlines():
if self.line == line:
in_header = True
continue
if re.match('[-]+$', line):
in_header = False
in_body = False
continue
if re.match('Body.*:.*', line):
in_header = False
in_body = True
self.body.append(line.split(':')[1])
continue
if ':' not in line:
in_header = False
in_body = True
self.body.append(line)
continue
if in_header:
self.header_size += len(line)
header_name, header_value = line.split(':', 1)
if header_name.lower() == 'content-length':
self.body_size = int(header_value)
if header_name.lower() == 'content-type':
self.content_type = header_value
self.headers.append({'name': header_name.lower(), 'value': header_value})
continue
if in_body:
self.body.append(line)
continue
self.body = '\n'.join(self.body)
if not self.body_size:
self.body_size = len(self.body)
def __str__(self):
return '{}, {}, {}'.format(self.time, self.source_id, self.line)
class RequestLogEntry(LogEntry):
type = 'request'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request_line = request_line = self.line.split('Request: ')[1]
self.method, self.url = request_line.split(' ')
def __repr__(self):
return '<REQUEST: {source_id} {method} {url}>'.format(**vars(self))
def __har__(self):
return {
'httpVersion': 'unknown',
'method': self.method,
'url': self.url,
'headers': self.headers,
'queryString': [],
'cookies': [],
'headersSize': self.header_size,
'bodySize': self.body_size,
}
class ResponseLogEntry(LogEntry):
type = 'response'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.status_code = int(self.line.split('Response: status code: ')[1])
def __repr__(self):
return '<RESPONSE: {source_id} {status_code}>'.format(**vars(self))
def __har__(self):
return {
'httpVersion': 'unknown',
'status': self.status_code,
'statusText': '',
'headers': self.headers,
'cookies': [],
'content': {
'size': self.body_size,
'mimeType': self.content_type or '*/*',
'text': self.body,
},
'redirectURL': '',
'headersSize': self.header_size,
'bodySize': self.body_size,
# '_transferSize': 0,
}
def make_entry(entry):
result = entry.get('result')
if result is None:
return
raw_text = result.get('_raw')
if not raw_text:
return
line = [line for line in raw_text.splitlines()
if line.startswith('Request') or line.startswith('Response')]
assert len(line) <= 1
if len(line) == 0:
return None
line = line[0]
if line.startswith('Request'):
return RequestLogEntry(result, line)
else:
return ResponseLogEntry(result, line)
class RequestTreeNode(HARMixin):
def __init__(self, request_entry, response_entry=None):
self.request = request_entry
self.response = response_entry
self.child_nodes = []
self.guessed = False
def __repr__(self):
return (
'<RequestTreeNode:\n'
' source_id={source_id}\n'
' start_time={start_time}\n'
' duration={duration}\n'
' request={request!r}\n'
' response={response!r}>').format(
source_id=self.source_id,
start_time=self.start_time,
duration=self.duration.total_seconds(),
request=self.request,
response=self.response)
def __har__(self):
duration = self.duration.total_seconds() * 1000
return {
'startedDateTime': self.start_time.isoformat(),
'time': duration,
'request': har(self.request),
'response': har(self.response),
'timings': {'send': duration, 'wait': -1, 'receive': -1},
'pageref': '',
'cache': {},
}
@property
def source_id(self):
return self.request.source_id
@property
def start_time(self):
return arrow.get(self.request.time).datetime
@property
def end_time(self):
return arrow.get(self.response.time).datetime
@property
def duration(self):
if self.response:
return self.end_time - self.start_time
else:
return None
@property
def request_line(self):
return '{} {}'.format(self.request.method, self.request.url)
@property
def response_status(self):
if self.response:
return self.response.status_code
else:
return None
@property
def is_complete(self):
return self.response is not None
def get_all_incomplete_nodes_by_source_id(self, source_id):
incomplete_nodes = []
if not self.is_complete and self.source_id == source_id:
incomplete_nodes.append(self)
for node in self.child_nodes:
incomplete_nodes.extend(node.get_all_incomplete_nodes_by_source_id(source_id))
return incomplete_nodes
def render(self, indentation):
lines = []
lines.append('{identation}{duration:>8} {source_id:40} {guessed}{request_line} {response_status}'.format(
identation=' ' * indentation,
duration=self.duration.total_seconds() if self.duration is not None else 'N/A',
source_id=self.request.source_id,
request_line=self.request_line,
guessed='*' if self.guessed else '',
response_status=self.response_status))
for child_node in self.child_nodes:
lines.append(child_node.render(indentation+1))
return '\n'.join(lines)
def create_request_tree(log_entries):
"""Create the request tree from the log entries.
Assuming the log entries are in chronological order.
Returns the root node of the request tree.
"""
def guess_candidate(candidate_nodes, response_entry):
if response_entry.pid:
candidate_nodes = [node for node in candidate_nodes
if node.request.pid == response_entry.pid]
if response_entry.status_code == 200:
candidate_node = next(filter(lambda node: node.request.method == 'GET', candidate_nodes))
elif response_entry.status_code in (201, 204):
filtered_nodes = [
node for node in candidate_nodes
if 'Location: {}'.format(node.request.url) in response_entry.raw['_raw']
]
if len(filtered_nodes) > 0:
candidate_node = filtered_nodes[0]
else:
candidate_node = candidate_nodes[0]
else:
candidate_node = candidate_nodes[-1]
candidate_node.guessed = True
return candidate_node
def handle_request_entry(entry, root, request_stack):
new_request = RequestTreeNode(entry)
if len(request_stack) == 0:
request_stack.append(new_request)
else:
request_stack[-1].child_nodes.append(new_request)
request_stack.append(new_request)
def handle_response_entry(entry, root, request_stack):
candidate_nodes = root.get_all_incomplete_nodes_by_source_id(entry.source_id)
if len(candidate_nodes) == 0:
raise RuntimeError('Something went wrong')
if len(candidate_nodes) > 1:
# multiple candidates located, need to guess
# which request this response matches to
candidate_node = guess_candidate(candidate_nodes, entry)
else:
candidate_node = candidate_nodes[0]
candidate_node.response = entry
# clean the stack by keep popping off completed nodes
while True:
if len(request_stack) == 0:
break
if request_stack[-1].is_complete:
request_stack.pop()
else:
break
def handle_unknown_entry(entry, root, request_stack):
print('unknown entry: {}'.format(entry))
handlers = {
'request': handle_request_entry,
'response': handle_response_entry,
}
log_entries = list(log_entries)
try:
root_request = RequestTreeNode(log_entries[0])
request_stack = [root_request]
except IndexError:
return None
else:
for entry in log_entries[1:]:
handlers.get(entry.type, handle_unknown_entry)(entry,
root_request,
request_stack)
return root_request
if __name__ == '__main__':
import pprint
import collections
is_not_none = lambda _: _ is not None # noqa
entries = list(reversed(list(filter(is_not_none, map(make_entry, read_entries())))))
requests = list(filter(lambda e: e.type == 'request', entries))
print('# of requests: {}'.format(len(requests)))
responses = list(filter(lambda e: e.type == 'response', entries))
print('# of responses: {}'.format(len(responses)))
counts = collections.Counter([
request.request_line for request in requests
])
print('Most common requests:')
pprint.pprint(counts.most_common(10))
counts = collections.Counter([
request.request_line for request in requests
if request.method == 'GET'
])
print('Most common GET requests:')
pprint.pprint(counts.most_common(10))
print('Most common hosts:')
counts = collections.Counter([
request.host_class for request in requests
])
print(counts.most_common(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment