Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Converting from OpenAddresses stats table to runs and sets table
from __future__ import print_function
from os import environ
from psycopg2 import connect
from psycopg2.extras import DictCursor, Json
from itertools import count, groupby
from datetime import timedelta, datetime
from operator import itemgetter
from uritemplate import expand
from dateutil.parser import parse
from dateutil.tz import tzutc
from requests import get, head
MACHINE_DB = environ['DATABASE_URL']
DASHBOARD_DB = environ['DASHBOARD_DB']
COMMIT_URL = 'https://api.github.com/repos/openaddresses/openaddresses/commits{?until}'
STATES_URL = 'http://data.openaddresses.io/runs/{tsname}/index.html'
github_auth = environ['GITHUB_TOKEN'], 'x-oauth-basic'
def get_stats():
q = '''SELECT tsname, source, version, process_time, cache_time,
address_count, geometry_type, processed_url, cache_url,
sample_url, output_url, fingerprint
FROM stats
ORDER BY tsname DESC, source DESC
-- LIMIT 10
'''
with connect(DASHBOARD_DB, cursor_factory=DictCursor) as conn:
with conn.cursor() as db:
db.execute(q)
return list(db)
if __name__ == '__main__':
with connect(MACHINE_DB) as conn:
with conn.cursor() as db:
db.execute('SELECT MIN(sequence) FROM jobs')
(start, ) = db.fetchone()
sequence = count(start-1, -1)
sets = groupby(get_stats(), itemgetter('tsname'))
queries = list()
for (tsname, set_runs) in sets:
print('--' * 40)
set_id = next(sequence)
datetime_start = datetime.fromtimestamp(float(tsname), tzutc())
commit_url = expand(COMMIT_URL, dict(until=datetime_start.isoformat()))
commit_sha = get(commit_url, auth=github_auth).json()[0].get('sha')
states_url = expand(STATES_URL, dict(tsname=tsname))
last_modified = head(states_url).headers.get('last-modified')
datetime_end = parse(last_modified) if last_modified else None
queries.append(('''INSERT INTO sets
(id, commit_sha, datetime_start, datetime_end)
VALUES (%s, %s, %s, %s)''',
(set_id, commit_sha, datetime_start, datetime_end)))
print(datetime_start, commit_sha, datetime_end - datetime_start)
print('- ' * 40)
for run in set_runs:
state = {
'source': run['source'], 'version': run['version'],
'sample': run['sample_url'], 'cache': run['cache_url'],
'geometry type': run['geometry_type'], 'fingerprint': run['fingerprint'],
'address count': run['address_count'], 'output': run['output_url'],
'processed': run['processed_url'], 'cache time': None, 'process time': None
}
if run['cache_time']:
state['cache time'] = str(timedelta(seconds=run['cache_time']))
if run['process_time']:
state['process time'] = str(timedelta(seconds=run['process_time']))
if run['output_url']:
datetime_tz = parse(head(run['output_url']).headers.get('last-modified'))
run_id = next(sequence)
source_path = 'sources/{source}'.format(**run)
status = bool(run['sample_url'] and run['cache_url'] and run['processed_url'])
print(status, datetime_tz, source_path)
queries.append(('''INSERT INTO runs
(id, source_path, datetime_tz, state,
status, set_id, commit_sha)
VALUES (%s, %s, %s, %s, %s, %s, %s)''',
(run_id, source_path, datetime_tz, Json(state),
status, set_id, commit_sha)))
print('--' * 40)
with connect(MACHINE_DB, cursor_factory=DictCursor) as conn:
with conn.cursor() as db:
db.execute('DELETE FROM runs WHERE id < 0')
db.execute('DELETE FROM sets WHERE id < 0')
db.execute('SELECT MIN(id), MAX(id) FROM runs')
print('runs ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT MIN(id), MAX(id) FROM sets')
print('sets ids: {} - {}'.format(*db.fetchone()))
for (query, values) in queries:
db.execute(query, values)
db.execute('SELECT MIN(id), MAX(id) FROM runs')
print('runs ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT MIN(id), MAX(id) FROM sets')
print('sets ids: {} - {}'.format(*db.fetchone()))
db.execute('SELECT * FROM runs ORDER BY id ASC LIMIT 1')
print(db.fetchone())
db.execute('SELECT * FROM sets ORDER BY id ASC LIMIT 1')
print(db.fetchone())
# raise Exception('Nope')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.