Skip to content

Instantly share code, notes, and snippets.

@kapcod
Created August 16, 2022 16:05
Show Gist options
  • Save kapcod/128699f4e80df44e26eaec57fb0366a9 to your computer and use it in GitHub Desktop.
Save kapcod/128699f4e80df44e26eaec57fb0366a9 to your computer and use it in GitHub Desktop.
Exports Athena history in hours json.gz files on S3 for storage and analysis. Can then be analyzed using Athena itself.
import boto3
import gzip
import json
import time
import re
def export(workgroup, region, hours=1):
athena = boto3.client('athena', region_name=region)
current_hour_hist = []
next_token = None
now_ts = time.time()
top_ts = cur_hour_ts = int(now_ts / 3600) * 3600
bottom_ts = cur_hour_ts - hours * 3600
ts = None
scanned = 0
camel_case_rx = re.compile(r'(?<!^)(?=[A-Z])')
while not ts or ts >= bottom_ts:
params = {"WorkGroup": workgroup}
if next_token: params['NextToken'] = next_token
res = athena.list_query_executions(**params)
next_token = res['NextToken']
data = athena.batch_get_query_execution(QueryExecutionIds=res['QueryExecutionIds'])['QueryExecutions']
data = [d for d in data if d['Status'].get('CompletionDateTime')]
sort_by_time = lambda x: x['Status']['CompletionDateTime'].timestamp()
data.sort(key=sort_by_time, reverse=True)
for d in data:
scanned += 1
for f in ['Status', 'Statistics', 'ResultConfiguration', 'QueryExecutionContext']:
d.update(d.pop(f))
ts = d['CompletionDateTime'].timestamp()
if ts < cur_hour_ts:
save(workgroup, cur_hour_ts, current_hour_hist, region)
current_hour_hist = []
cur_hour_ts -= 3600
if ts < bottom_ts: break
if ts >= top_ts: continue
for f in ['SubmissionDateTime', 'CompletionDateTime']:
d[f] = d[f].strftime('%Y-%m-%d %H:%M:%S')
for f in list(d.keys()):
d[camel_case_rx.sub('_', f).lower()] = d.pop(f)
current_hour_hist.append(json.dumps(d))
print('\rScanned: {}, time: {}'.format(scanned, time.strftime('%Y-%m-%d %H:%M', time.gmtime(ts))))
def save(workgroup, cur_hour_ts, hist, region):
if not hist: return
s3 = boto3.client('s3', region_name=region)
path = time.strftime("athena_history/date=%Y-%m-%d/" + workgroup + "-%H.json.gz", time.gmtime(cur_hour_ts))
data = gzip.compress(bytes('\n'.join(hist), 'utf8'))
s3.put_object(Bucket='seekingalpha-data-dev', Key=path, Body=data)
print('Saved: {}: {}'.format(path, len(hist)))
def export_all(region, hours=1):
athena = boto3.client('athena', region_name=region)
for workgroup in athena.list_work_groups()['WorkGroups']:
export(workgroup, region, hours)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment