Skip to content

Instantly share code, notes, and snippets.

@migurski
Created July 23, 2015 05:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save migurski/d28b6ea110493dddbbea to your computer and use it in GitHub Desktop.
Save migurski/d28b6ea110493dddbbea to your computer and use it in GitHub Desktop.
Converting from OpenAddresses stats table to runs and sets table
from __future__ import print_function
from os import environ
from psycopg2 import connect
from psycopg2.extras import DictCursor, Json
from itertools import count, groupby
from datetime import timedelta, datetime
from operator import itemgetter
from uritemplate import expand
from dateutil.parser import parse
from dateutil.tz import tzutc
from requests import get, head
MACHINE_DB = environ['DATABASE_URL']
DASHBOARD_DB = environ['DASHBOARD_DB']
COMMIT_URL = 'https://api.github.com/repos/openaddresses/openaddresses/commits{?until}'
STATES_URL = 'http://data.openaddresses.io/runs/{tsname}/index.html'
github_auth = environ['GITHUB_TOKEN'], 'x-oauth-basic'
def get_stats():
q = '''SELECT tsname, source, version, process_time, cache_time,
address_count, geometry_type, processed_url, cache_url,
sample_url, output_url, fingerprint
FROM stats
ORDER BY tsname DESC, source DESC
-- LIMIT 10
'''
with connect(DASHBOARD_DB, cursor_factory=DictCursor) as conn:
with conn.cursor() as db:
db.execute(q)
return list(db)
if __name__ == '__main__':
with connect(MACHINE_DB) as conn:
with conn.cursor() as db:
db.execute('SELECT MIN(sequence) FROM jobs')
(start, ) = db.fetchone()
sequence = count(start-1, -1)
sets = groupby(get_stats(), itemgetter('tsname'))
queries = list()
for (tsname, set_runs) in sets:
print('--' * 40)
set_id = next(sequence)
datetime_start = datetime.fromtimestamp(float(tsname), tzutc())
commit_url = expand(COMMIT_URL, dict(until=datetime_start.isoformat()))
commit_sha = get(commit_url, auth=github_auth).json()[0].get('sha')
states_url = expand(STATES_URL, dict(tsname=tsname))
last_modified = head(states_url).headers.get('last-modified')
datetime_end = parse(last_modified) if last_modified else None
queries.append(('''INSERT INTO sets
(id, commit_sha, datetime_start, datetime_end)
VALUES (%s, %s, %s, %s)''',
(set_id, commit_sha, datetime_start, datetime_end)))
print(datetime_start, commit_sha, datetime_end - datetime_start)
print('- ' * 40)
for run in set_runs:
state = {
'source': run['source'], 'version': run['version'],
'sample': run['sample_url'], 'cache': run['cache_url'],
'geometry type': run['geometry_type'], 'fingerprint': run['fingerprint'],
'address count': run['address_count'], 'output': run['output_url'],
'processed': run['processed_url'], 'cache time': None, 'process time': None
}
if run['cache_time']:
state['cache time'] = str(timedelta(seconds=run['cache_time']))
if run['process_time']:
state['process time'] = str(timedelta(seconds=run['process_time']))
if run['output_url']:
datetime_tz = parse(head(run['output_url']).headers.get('last-modified'))
run_id = next(sequence)
source_path = 'sources/{source}'.format(**run)
status = bool(run['sample_url'] and run['cache_url'] and run['processed_url'])
print(status, datetime_tz, source_path)
queries.append(('''INSERT INTO runs
(id, source_path, datetime_tz, state,
status, set_id, commit_sha)
VALUES (%s, %s, %s, %s, %s, %s, %s)''',
(run_id, source_path, datetime_tz, Json(state),
status, set_id, commit_sha)))
print('--' * 40)
with connect(MACHINE_DB, cursor_factory=DictCursor) as conn:
with conn.cursor() as db:
db.execute('DELETE FROM runs WHERE id < 0')
db.execute('DELETE FROM sets WHERE id < 0')
db.execute('SELECT MIN(id), MAX(id) FROM runs')
print('runs ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT MIN(id), MAX(id) FROM sets')
print('sets ids: {} - {}'.format(*db.fetchone()))
for (query, values) in queries:
db.execute(query, values)
db.execute('SELECT MIN(id), MAX(id) FROM runs')
print('runs ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT MIN(id), MAX(id) FROM sets')
print('sets ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT * FROM runs ORDER BY id ASC LIMIT 1')
print(db.fetchone())
db.execute('SELECT * FROM sets ORDER BY id ASC LIMIT 1')
print(db.fetchone())
# raise Exception('Nope')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment