Last active
September 27, 2020 09:09
-
-
Save lamchau/cb863219c82e80690d8a23de4e9468c3 to your computer and use it in GitHub Desktop.
Python3 script to download all events from a bugsnag URL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import collections | |
from typing import List | |
import datetime | |
import distutils.spawn | |
import hashlib | |
import json | |
import os | |
import re | |
import requests | |
import shutil | |
import signal | |
import subprocess | |
import sys | |
import time | |
import urllib | |
BUGSNAG_INFO = 'bugsnag.json' | |
DEBUG = False | |
HTTP_OK = 200 | |
HTTP_TOO_MANY_REQUESTS = 429 | |
BUGSNAG_TOKEN = os.getenv('BUGSNAG_TOKEN') | |
if not BUGSNAG_TOKEN: | |
print('Environment variable BUGSNAG_TOKEN not found') | |
sys.exit(1) | |
Project = collections.namedtuple('Project', [ | |
'organization_id', | |
'project_slug', | |
'error_id' | |
]) | |
def parse_app_url(s): | |
regex = re.compile(r'app.bugsnag.com/([\w\-]+)/([\w\-]+)/errors/([a-f0-9]+)') | |
matches = regex.search(s) | |
if matches: | |
organization_slug = matches.group(1) | |
project_slug = matches.group(2) | |
error_id = matches.group(3) | |
return Project(organization_slug, project_slug, error_id) | |
return None | |
def get_bugsnag_urls(url): | |
organization_slug, project_slug, error_id = parse_app_url(url) | |
project_id = get_project_id(project_slug) | |
query = urllib.parse.urlencode({ | |
'sort': 'timestamp', | |
'direction': 'desc', | |
'filters': '', | |
'full_reports': 'false' | |
}, doseq=True) | |
return { | |
'api': f'https://api.bugsnag.com/projects/{project_id}/errors/{error_id}/events?{query}', | |
'app': f'https://app.bugsnag.com/{organization_slug}/{project_slug}/errors/{error_id}' | |
} | |
def get_projects(): | |
if os.path.exists(BUGSNAG_INFO): | |
with open(BUGSNAG_INFO) as f: | |
project_info = json.load(f) | |
return project_info | |
url = 'https://api.bugsnag.com/user/organizations' | |
headers = { | |
'Authorization': f'token {BUGSNAG_TOKEN}' | |
} | |
response = requests.get(url, headers=headers) | |
organizations = [] | |
for org in response.json(): | |
organization_id = org.get('id') | |
url = f'https://api.bugsnag.com/organizations/{organization_id}/projects' | |
response = requests.get(url, headers=headers) | |
projects = dict((p.get('slug'), p.get('id')) for p in response.json()) | |
organizations.append({ | |
'organization': org, | |
'projects': projects | |
}) | |
with open(BUGSNAG_INFO, 'w') as f: | |
json.dump(organizations, f, indent=2, ensure_ascii=False) | |
return organizations | |
def get_project_id(project_slug): | |
if project_slug: | |
project_slug = project_slug.lower() | |
for org in get_projects(): | |
projects = org.get('projects') | |
if project_slug in projects: | |
return projects.get(project_slug) | |
raise Exception(f'Invalid project {project_slug}') | |
def camel_to_snake(s): | |
return ''.join(['_' + c.lower() if c.isupper() else c for c in s]).lstrip('_') | |
def md5(s): | |
return hashlib.md5(s.encode('utf-8')).hexdigest() | |
def parse_exception(message): | |
if 'xxx' in message: | |
return parse_xxx_message(message) | |
return message | |
def parse_xxx_message(s): | |
if not s: | |
return s | |
s = s.strip() | |
index = s.find('xxx=[') | |
s = s[:index] | |
out = {} | |
for index, x in enumerate(s.split(',m')): | |
if index == 0: | |
continue | |
key, value = map(lambda y: y.strip(), x.split('=', 2)) | |
key = camel_to_snake(key) | |
if value and value[len(value) - 1] == ',': | |
value = value[:-1] | |
out[key] = value | |
return out | |
def to_json(filename: str, rows: List[dict]): | |
with open(filename, 'a+') as f: | |
json.dump(rows, f, indent=2, ensure_ascii=False) | |
rows.clear() | |
def signal_handler(signal, frame): | |
global is_running | |
is_running = False | |
def render_header(headers, urls, filename, result_size=None): | |
total_count = result_size if result_size else int(headers.get('X-Total-Count')) | |
requests_per_minute = int(headers.get('X-RateLimit-Limit')) | |
events_per_request = 30 | |
estimated_minutes = (total_count / (requests_per_minute * events_per_request)) | |
print(' Start Time:', datetime.datetime.now().replace(microsecond=0).isoformat()) | |
print(' Filename:', filename) | |
print('Application URL:', urls.get('app')) | |
print(' API URL:', urls.get('api')) | |
print(' Limit:', requests_per_minute) | |
print(' Total Events:', total_count) | |
print(' Estimated Time:', f'{estimated_minutes:.2f} minutes') | |
def main(app_url, result_size=None): | |
global is_running | |
is_running = True | |
initial_request = True | |
signal.signal(signal.SIGINT, signal_handler) | |
urls = get_bugsnag_urls(app_url) | |
api_url = urls.get('api') | |
headers = { | |
'Authorization': f'token {BUGSNAG_TOKEN}', | |
'X-Version': '2' | |
} | |
_, _, error_id = parse_app_url(app_url) | |
filename = f'{error_id}.json' | |
if os.path.exists(filename): | |
os.remove(filename) | |
row_count = 0 | |
rows = [] | |
request_urls = [api_url] | |
while request_urls: | |
if not is_running or (result_size and row_count >= result_size): | |
break | |
response = requests.get(url=request_urls[-1], headers=headers) | |
status_code = response.status_code | |
if status_code == HTTP_OK: | |
request_urls.pop() | |
remaining_limit = int(response.headers.get('X-RateLimit-Remaining')) | |
if initial_request: | |
render_header(headers=response.headers, | |
urls=urls, | |
filename=filename, | |
result_size=result_size) | |
initial_request = False | |
link = response.headers.get('link') | |
if link: | |
next_index = link.find('>; rel="next"') | |
link = link[1:next_index] | |
request_urls.append(link) | |
if DEBUG: | |
print(f'url: {md5(link)}') | |
if remaining_limit > 0: | |
sys.stdout.write(f'\r{remaining_limit} requests remaining.. \033[K') | |
sys.stdout.flush() | |
for event in response.json(): | |
if event == 'errors': | |
break | |
# the more common use-case would be to extract information | |
# to create our own timeseries charts (consider caching with | |
# something like timescale) or writing to a csv | |
# https://bugsnagapiv2.docs.apiary.io/#reference/errors/events/view-an-event | |
for error in event.get('exceptions'): | |
# for most errors, the message is the same -- in rare circumstances | |
# containes metadata worth parsing | |
message = parse_exception(error.get('message')) | |
if message: | |
rows.append(message) | |
row_count += len(rows) | |
to_json(filename, rows) | |
elif status_code == HTTP_TOO_MANY_REQUESTS: | |
retry_after = int(response.headers.get('Retry-After')) | |
for remaining in range(retry_after, 0, -1): | |
if not is_running: | |
break | |
sys.stdout.write(f'\rRate limit exceeded: Waiting {remaining} second(s)..\033[K') | |
sys.stdout.flush() | |
time.sleep(1) | |
else: | |
print(json.dumps(response.json(), indent=2)) | |
break | |
if rows: | |
to_json(filename, rows) | |
if row_count < 1: | |
return | |
# Note: only applicable when we're parsing events into an object and don't | |
# know all of the fields ahead of time (or expect some to be null). | |
# | |
# since the number of events can be arbitrarily large, we don't want to | |
# store all of them in memory because it's possible the program either | |
# | |
# 1. OOM crashes | |
# 2. unhandled exceptions | |
# 3. user cancel/abort | |
# | |
# so to prevent any data loss, we we'll continually write to the file -- but | |
# this means that it'll end up as invalid JSON so reprocessing using `jq` | |
# is needed to make it valid JSON | |
jq_path = distutils.spawn.find_executable('jq') | |
if jq_path: | |
timestamp = int(time.time()) | |
backup_file = f'{filename}.{timestamp}' | |
shutil.copy(filename, backup_file) | |
with open(backup_file, 'r') as in_file: | |
with open(filename, 'w') as out_file: | |
jq = subprocess.Popen([jq_path, '--slurp', 'flatten'], | |
stdin=in_file, stdout=out_file) | |
(_, error) = jq.communicate() | |
if error: | |
print(error) | |
else: | |
os.remove(backup_file) | |
else: | |
print('WARNING: JSON file may be invalid, use json --slurp "flatten"') | |
if __name__ == '__main__': | |
url = 'https://app.bugsnag.com/org_slug/project_name/errors/deadbeefb04e580019c9345d' | |
main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment