Skip to content

Instantly share code, notes, and snippets.

@lamchau
Last active September 27, 2020 09:09
Show Gist options
  • Save lamchau/cb863219c82e80690d8a23de4e9468c3 to your computer and use it in GitHub Desktop.
Save lamchau/cb863219c82e80690d8a23de4e9468c3 to your computer and use it in GitHub Desktop.
Python3 script to download all events from a bugsnag URL
#!/usr/bin/env python3
import collections
from typing import List
import datetime
import distutils.spawn
import hashlib
import json
import os
import re
import requests
import shutil
import signal
import subprocess
import sys
import time
import urllib
BUGSNAG_INFO = 'bugsnag.json'
DEBUG = False
HTTP_OK = 200
HTTP_TOO_MANY_REQUESTS = 429
BUGSNAG_TOKEN = os.getenv('BUGSNAG_TOKEN')
if not BUGSNAG_TOKEN:
print('Environment variable BUGSNAG_TOKEN not found')
sys.exit(1)
Project = collections.namedtuple('Project', [
'organization_id',
'project_slug',
'error_id'
])
def parse_app_url(s):
regex = re.compile(r'app.bugsnag.com/([\w\-]+)/([\w\-]+)/errors/([a-f0-9]+)')
matches = regex.search(s)
if matches:
organization_slug = matches.group(1)
project_slug = matches.group(2)
error_id = matches.group(3)
return Project(organization_slug, project_slug, error_id)
return None
def get_bugsnag_urls(url):
organization_slug, project_slug, error_id = parse_app_url(url)
project_id = get_project_id(project_slug)
query = urllib.parse.urlencode({
'sort': 'timestamp',
'direction': 'desc',
'filters': '',
'full_reports': 'false'
}, doseq=True)
return {
'api': f'https://api.bugsnag.com/projects/{project_id}/errors/{error_id}/events?{query}',
'app': f'https://app.bugsnag.com/{organization_slug}/{project_slug}/errors/{error_id}'
}
def get_projects():
if os.path.exists(BUGSNAG_INFO):
with open(BUGSNAG_INFO) as f:
project_info = json.load(f)
return project_info
url = 'https://api.bugsnag.com/user/organizations'
headers = {
'Authorization': f'token {BUGSNAG_TOKEN}'
}
response = requests.get(url, headers=headers)
organizations = []
for org in response.json():
organization_id = org.get('id')
url = f'https://api.bugsnag.com/organizations/{organization_id}/projects'
response = requests.get(url, headers=headers)
projects = dict((p.get('slug'), p.get('id')) for p in response.json())
organizations.append({
'organization': org,
'projects': projects
})
with open(BUGSNAG_INFO, 'w') as f:
json.dump(organizations, f, indent=2, ensure_ascii=False)
return organizations
def get_project_id(project_slug):
if project_slug:
project_slug = project_slug.lower()
for org in get_projects():
projects = org.get('projects')
if project_slug in projects:
return projects.get(project_slug)
raise Exception(f'Invalid project {project_slug}')
def camel_to_snake(s):
return ''.join(['_' + c.lower() if c.isupper() else c for c in s]).lstrip('_')
def md5(s):
return hashlib.md5(s.encode('utf-8')).hexdigest()
def parse_exception(message):
if 'xxx' in message:
return parse_xxx_message(message)
return message
def parse_xxx_message(s):
if not s:
return s
s = s.strip()
index = s.find('xxx=[')
s = s[:index]
out = {}
for index, x in enumerate(s.split(',m')):
if index == 0:
continue
key, value = map(lambda y: y.strip(), x.split('=', 2))
key = camel_to_snake(key)
if value and value[len(value) - 1] == ',':
value = value[:-1]
out[key] = value
return out
def to_json(filename: str, rows: List[dict]):
with open(filename, 'a+') as f:
json.dump(rows, f, indent=2, ensure_ascii=False)
rows.clear()
def signal_handler(signal, frame):
global is_running
is_running = False
def render_header(headers, urls, filename, result_size=None):
total_count = result_size if result_size else int(headers.get('X-Total-Count'))
requests_per_minute = int(headers.get('X-RateLimit-Limit'))
events_per_request = 30
estimated_minutes = (total_count / (requests_per_minute * events_per_request))
print(' Start Time:', datetime.datetime.now().replace(microsecond=0).isoformat())
print(' Filename:', filename)
print('Application URL:', urls.get('app'))
print(' API URL:', urls.get('api'))
print(' Limit:', requests_per_minute)
print(' Total Events:', total_count)
print(' Estimated Time:', f'{estimated_minutes:.2f} minutes')
def main(app_url, result_size=None):
global is_running
is_running = True
initial_request = True
signal.signal(signal.SIGINT, signal_handler)
urls = get_bugsnag_urls(app_url)
api_url = urls.get('api')
headers = {
'Authorization': f'token {BUGSNAG_TOKEN}',
'X-Version': '2'
}
_, _, error_id = parse_app_url(app_url)
filename = f'{error_id}.json'
if os.path.exists(filename):
os.remove(filename)
row_count = 0
rows = []
request_urls = [api_url]
while request_urls:
if not is_running or (result_size and row_count >= result_size):
break
response = requests.get(url=request_urls[-1], headers=headers)
status_code = response.status_code
if status_code == HTTP_OK:
request_urls.pop()
remaining_limit = int(response.headers.get('X-RateLimit-Remaining'))
if initial_request:
render_header(headers=response.headers,
urls=urls,
filename=filename,
result_size=result_size)
initial_request = False
link = response.headers.get('link')
if link:
next_index = link.find('>; rel="next"')
link = link[1:next_index]
request_urls.append(link)
if DEBUG:
print(f'url: {md5(link)}')
if remaining_limit > 0:
sys.stdout.write(f'\r{remaining_limit} requests remaining.. \033[K')
sys.stdout.flush()
for event in response.json():
if event == 'errors':
break
# the more common use-case would be to extract information
# to create our own timeseries charts (consider caching with
# something like timescale) or writing to a csv
# https://bugsnagapiv2.docs.apiary.io/#reference/errors/events/view-an-event
for error in event.get('exceptions'):
# for most errors, the message is the same -- in rare circumstances
# containes metadata worth parsing
message = parse_exception(error.get('message'))
if message:
rows.append(message)
row_count += len(rows)
to_json(filename, rows)
elif status_code == HTTP_TOO_MANY_REQUESTS:
retry_after = int(response.headers.get('Retry-After'))
for remaining in range(retry_after, 0, -1):
if not is_running:
break
sys.stdout.write(f'\rRate limit exceeded: Waiting {remaining} second(s)..\033[K')
sys.stdout.flush()
time.sleep(1)
else:
print(json.dumps(response.json(), indent=2))
break
if rows:
to_json(filename, rows)
if row_count < 1:
return
# Note: only applicable when we're parsing events into an object and don't
# know all of the fields ahead of time (or expect some to be null).
#
# since the number of events can be arbitrarily large, we don't want to
# store all of them in memory because it's possible the program either
#
# 1. OOM crashes
# 2. unhandled exceptions
# 3. user cancel/abort
#
# so to prevent any data loss, we we'll continually write to the file -- but
# this means that it'll end up as invalid JSON so reprocessing using `jq`
# is needed to make it valid JSON
jq_path = distutils.spawn.find_executable('jq')
if jq_path:
timestamp = int(time.time())
backup_file = f'{filename}.{timestamp}'
shutil.copy(filename, backup_file)
with open(backup_file, 'r') as in_file:
with open(filename, 'w') as out_file:
jq = subprocess.Popen([jq_path, '--slurp', 'flatten'],
stdin=in_file, stdout=out_file)
(_, error) = jq.communicate()
if error:
print(error)
else:
os.remove(backup_file)
else:
print('WARNING: JSON file may be invalid, use json --slurp "flatten"')
if __name__ == '__main__':
url = 'https://app.bugsnag.com/org_slug/project_name/errors/deadbeefb04e580019c9345d'
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment