Skip to content

Instantly share code, notes, and snippets.

@yamitzky
Created October 5, 2019 03:34
Show Gist options
  • Save yamitzky/09123937703a3a35fe5cb86d53c4aca0 to your computer and use it in GitHub Desktop.
Save yamitzky/09123937703a3a35fe5cb86d53c4aca0 to your computer and use it in GitHub Desktop.
Athena で発行されたクエリ一覧を分析して、スキャン量の多いものを分析するやつ
from typing import List, Iterable, Iterator
import re
from itertools import groupby as _groupby
import datetime
import logging
import boto3
logging.basicConfig(level=logging.INFO)
REGIONS = ['ap-northeast-1', 'us-east-1']
START = datetime.datetime.fromisoformat('2019-09-30T00:00:00.000000')
def groupby(li: Iterable, key) -> Iterator:
return _groupby(sorted(li, key=key), key=key)
def agg_by_scan(groups: Iterable) -> List:
result = []
for query, executions in groups:
scan = sum(e['Statistics']['DataScannedInBytes'] for e in executions)
result.append((query, scan))
return sorted(result, key=lambda qs: qs[1])
def humanize(b: int) -> str:
if b < 1_000:
return f'{b: >3} '
elif b < 1_000_000:
return f'{b // 1_000: >3} K'
elif b < 1_000_000_000:
return f'{b // 1_000_000: >3} M'
elif b < 1_000_000_000_000:
return f'{b // 1_000_000_000: >3} G'
elif b < 1_000_000_000_000_000:
return f'{b // 1_000_000_000_000: >3} T'
else:
return f'{b // 1_000_000_000_000_000: >3} P'
executions: List[dict] = []
# fetch all tables
for region_name in REGIONS:
athena = boto3.client('athena', region_name=region_name)
next_token = None
done = False
while not done:
args = {'MaxResults': 50}
if next_token:
args['NextToken'] = next_token
res = athena.list_query_executions(**args)
ids = res['QueryExecutionIds']
next_token = res['NextToken']
if not ids:
raise Exception('no result!')
res = athena.batch_get_query_execution(QueryExecutionIds=ids)
time = None
for execution in res['QueryExecutions']:
if not execution.get('Statistics'):
continue
if not execution['Statistics']['DataScannedInBytes']:
continue
if not execution['Status'].get('CompletionDateTime'):
continue
time = datetime.datetime.utcfromtimestamp(
execution['Status']['CompletionDateTime'].timestamp())
if time < START:
done = True
executions.append(execution)
if time:
logging.info("%s %s", region_name, time)
# extract table and redash information
table_pattern = re.compile('from\\s+([a-zA-Z0-9.`"\'_-]+)', re.IGNORECASE)
as_pattern = re.compile('([a-zA-Z0-9.`"\'_-]+)\\s+as', re.IGNORECASE)
redash_pattern = re.compile('Query ID: (\\d+)')
for e in executions:
aliases = as_pattern.findall(e['Query'])
tables = table_pattern.findall(e['Query'])
e['tables'] = ','.join(sorted(t for t in tables if t not in aliases))
redash_id = redash_pattern.findall(e['Query'])
e['redash_id'] = int(redash_id[0]) if redash_id else -1
# top queries
print('\n# Top Queries')
print('scan\tid\ttable')
print('--------------------------')
print('...\t...\t...')
for e in sorted(executions, key=lambda e: e['Statistics']['DataScannedInBytes'])[-10:]:
scan = e['Statistics']['DataScannedInBytes']
print(f'{humanize(scan)}\t{e["redash_id"]}\t{e["tables"]}')
groups = groupby(executions, lambda e: e['redash_id'])
print('\n# Top Queries by Redash Query ID')
print('scan\tredash_id')
print('----------------')
print('...\t...')
for redash_id, scan in agg_by_scan(groups)[-10:]:
id = "" if redash_id == -1 else redash_id
print(f'{humanize(scan)}\t{id}')
# queries by tables
groups = groupby(executions, lambda e: e['tables'])
print('\n# Top Queries by Table')
print('scan\ttable')
print('----------------')
print('...\t...')
for tables, scan in agg_by_scan(groups)[-10:]:
print(f'{humanize(scan)}\t{tables}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment