Skip to content

Instantly share code, notes, and snippets.

@cjw296
Created October 12, 2020 05:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjw296/91f8a442195e8600cfbe154698d8d05f to your computer and use it in GitHub Desktop.
Save cjw296/91f8a442195e8600cfbe154698d8d05f to your computer and use it in GitHub Desktop.
import concurrent.futures
import json
import requests
from datetime import datetime, timedelta
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
from tqdm.notebook import tqdm
from urllib.parse import urlparse, parse_qs
area_code = 'areaCode'
area_name = 'areaName'
area_type = 'areaType'
date = 'date'
new_cases_by_specimen_date = 'newCasesBySpecimenDate'
new_deaths_by_death_date = 'newDeaths28DaysByDeathDate'
new_tests_by_publish_date = 'newTestsByPublishDate'
release_timestamp = 'releaseTimestamp'
ltla = 'ltla'
nation = 'nation'
overview = 'overview'
def get(filters, structure, **params):
_params={
'filters':';'.join(f'{k}={v}' for (k, v) in filters.items()),
'structure': json.dumps({element:element for element in structure})
}
_params.update(params)
response = requests.get('https://api.coronavirus.data.gov.uk/v1/data', timeout=20, params=_params)
if response.status_code != 200:
raise ValueError(response.status_code)
return response.json()
def query(filters, structure, max_workers=None, **params):
page = 1
response = get(filters, structure, page=page, **params)
result = response['data']
max_page = int(parse_qs(urlparse(response['pagination']['last']).query)['page'][0])
if max_page > 1:
t = tqdm(total=max_page)
t.update(1)
todo = range(2, max_page+1)
attempt = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers or max_page-1) as executor:
while todo:
attempt += 1
bad = []
t.set_postfix({'errors': len(bad), 'attempt': attempt})
futures = {executor.submit(get, filters, structure, page=page, **params): page
for page in todo}
for future in concurrent.futures.as_completed(futures):
page = futures[future]
try:
response = future.result()
except Exception as exc:
bad.append(page)
t.set_postfix({'errors': len(bad), 'attempt': attempt})
else:
result.extend(response['data'])
t.update(1)
todo = bad
t.close()
return pd.DataFrame(result)
# runtime ~1s
overview_data = query(
filters={area_type: overview},
structure=[release_timestamp, date, area_name, area_code,
new_cases_by_specimen_date, new_deaths_by_death_date, new_tests_by_publish_date]
)
# runtime ~1s
nation_data = query(
filters={area_type: nation},
structure=[release_timestamp, date, area_name, area_code, new_cases_by_specimen_date, new_deaths_by_death_date]
)
nation_data
# at least 20s, usually 2-10 minutes, plenty of 500s that need retrying
area_data = query(
filters={area_type: ltla},
structure=[release_timestamp, date, area_name, area_code, new_cases_by_specimen_date, new_deaths_by_death_date],
max_workers=10
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment