Skip to content

Instantly share code, notes, and snippets.

@reinderien
Created September 21, 2021 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reinderien/aa823b8968e37b397934b36383f6405a to your computer and use it in GitHub Desktop.
Save reinderien/aa823b8968e37b397934b36383f6405a to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from os import path
from pathlib import Path
from typing import List, Dict, Optional, Any
from urllib.parse import urljoin
from bs4 import BeautifulSoup, SoupStrainer
import logging
import os
import re
import shutil
import zipfile
from httpx import Client
from requests import Session, Response
class URLNotFoundError(Exception):
pass
@dataclass
class Seeker(ABC):
session: Session
output_location: Path
@property
@abstractmethod
def url(self) -> str:
raise NotImplementedError()
@property
@abstractmethod
def file_name(self) -> str:
raise NotImplementedError()
def get_soup(self, url: str, parse_only: Optional[SoupStrainer] = None) -> BeautifulSoup:
html = self.get(url, headers={'Accept': 'text/html'}).text
return BeautifulSoup(html, features='lxml', parse_only=parse_only)
@staticmethod
@contextmanager
def log_errors(url: str):
try:
yield
except Exception:
logging.error(
f'Exception occurred when trying to download from {url}', exc_info=True,
)
raise
def request(self, url: str, *args, **kwargs) -> Response:
with self.log_errors(url):
resp = self.session.request(url=url, *args, **kwargs)
resp.raise_for_status()
return resp
def get(self, url: str, **kwargs) -> Response:
return self.request(url=url, method='get', **kwargs)
def download_from_url(self) -> None:
with self.get(self.url, stream=True) as resp:
resp.raise_for_status()
with open(self.output_location / self.file_name, 'wb') as output:
shutil.copyfileobj(resp.raw, output)
def move_files(self, paths: List[Path]) -> None:
for src_path in paths:
try:
logging.info(f'Moving {src_path} to {self.output_location}')
shutil.move(src_path, self.output_location)
except Exception as ex:
logging.error(f'Exception moving {path} to {self.output_location}: {ex}')
class HouseHoldMakeupSeeker(Seeker):
@property
def file_name(self) -> str:
return 'house_hold_makeup.csv'
@property
def url(self) -> str:
return (
'https://www.nomisweb.co.uk/api/v01/dataset/NM_538_1.data.csv?'
'date=latest'
'&geography='
'1157627905...1157628910,1157629038...1157631399,'
'1157631412...1157631520,1157631643...1157632624,'
'1157632696...1157633851,1157633977...1157635939,'
'1157628911...1157629037,1157631400...1157631411,'
'1157631521...1157631642,1157632625...1157632695,'
'1157633852...1157633976'
'&rural_urban=0,2...4,1,8,5,9,6,10,7'
'&cell=0...8'
'&measures=20100'
)
class HouseHoldAgesSeeker(Seeker):
ID = 'ageing-population-projections'
@property
def file_name(self) -> str:
return 'house_hold_ages.csv'
@property
def url(self) -> str:
with self.get(
'https://api.beta.ons.gov.uk/v1/datasets',
headers={'Accept': 'application/json'},
params={'limit': 1000},
) as resp:
data_sets = resp.json()['items']
for data in data_sets:
if self.ID == data['id']:
download_url = data['links']['latest_version']['href']
break
else:
raise URLNotFoundError()
with self.get(download_url, headers={'Accept': 'application/json'}) as resp:
return resp.json()['downloads']['csv']['href']
class HouseHoldIncomeSeeker(Seeker):
SHEET_NAME = '2a'
LINK_STRAIN = SoupStrainer(
'a', {'aria-label': re.compile(r'Download Median house prices')}
)
@property
def file_name(self) -> str:
return 'household_income_england_wales.xls'
# Maybe for some reason 'household_income_england_wales.csv' ? Why?
@property
def url(self) -> str:
dataset_url = (
'https://www.ons.gov.uk/peoplepopulationandcommunity/housing/datasets'
'/medianhousepricefornationalandsubnationalgeographies'
'quarterlyrollingyearhpssadataset09'
)
soup = self.get_soup(dataset_url, parse_only=self.LINK_STRAIN)
href = soup.find()['href']
return urljoin(dataset_url, href)
class HouseHoldIncomeSeekerScotland(Seeker):
# Requests has trouble with this one. Might be because it doesn't use
# HTTP 2. With httpx it's still awfully slow, but at least it completes.
SHEET_NAME = 'Table 4_median price'
ACCORDION_STRAIN = SoupStrainer('div', class_='accordion')
@property
def file_name(self) -> str:
return 'household_income_scotland.xlsx'
@property
def url(self) -> str:
soup = self.get_soup(
'https://www.ros.gov.uk/data-and-statistics/house-price-statistics',
parse_only=self.ACCORDION_STRAIN,
)
anchor = soup.find_all(recursive=False)[1].a
return anchor['href']
def download_from_url(self) -> None:
client = Client(http1=False, http2=True)
url = self.url
with self.log_errors(url):
response = client.get(url)
response.raise_for_status()
content = response.content
with open(self.output_location / self.file_name, 'wb') as output:
output.write(content)
class AddressHierarchySeekerV2(Seeker):
PARAMS: Dict[str, Any] = {
'q': 'ONS Postcode directory',
'sort': '-created',
'agg': {'fields': 'downloadable,hasApi,source,tags,type,access'},
'fields': {
'datasets':
'id,name,created,modified,modifiedProvenance,'
'searchDescription,recordCount,source,extent,owner,'
'thumbnailUrl,type,url,xFrameOptions,'
'contentSecurityPolicy,siteUrl,tags,collection,size,'
'initiativeCategories,slug,startDate,venue,initiativeId,'
'initiativeTitle,organizers,isAllDay,onlineLocation,timeZone'
},
'filter': {'tags': 'any(onspd)', 'type': 'any(csv collection)'},
'catalog': {
'groupIds': 'any(b542daa9c43646ac96a7118d655d681d,2a3b355653cc4cd083e8c40a099f0d7d)'
},
}
file_names: List = []
@property
def file_name(self) -> str:
return 'ons_postcode_directory.zip'
@property
def url(self) -> str:
response = self.request(
method='post',
headers={'Accept': 'application/json'},
url='https://opendata.arcgis.com/api/v3/search',
params={
'q': 'ONS Postcode directory',
'sort': '-created',
'tags': 'onspd',
'type': 'csv collection',
},
json=self.PARAMS,
).json()
id = response['data'][0]['id']
return f'https://www.arcgis.com/sharing/content/items/{id}/data'
def pre_process(self):
with zipfile.ZipFile(self.output_location / self.file / self.extension, 'r') as zip_ref:
zip_ref.extractall(self.output_location / self.file)
self.collect_file_name(
'/Documents', ' Census Output Area Classification Names and Codes UK.csv'
)
self.collect_file_name(
'/Documents', 'Westminster Parliamentary Constituency names and codes UK'
)
self.collect_file_name('/Data', 'ONSPD')
self.move_files(self.file_names)
shutil.rmtree(self.output_location / self.file)
os.remove(self.output_location / self.file / self.extension)
# This needs a refactor
def collect_file_name(self, directory, name):
latest_file = None
for file_name in os.listdir(super().output_location / self.file / directory):
if file_name.endswith(name):
latest = 0
if latest < int(file_name.split(' ')[0]):
latest_file = file_name.split(' ')[0] + name
if file_name.startswith(name) and file_name.endswith('.csv'):
self.file_names.append(
super().output_location / self.file / directory / file_name
)
if latest_file is not None:
self.file_names.append(
super().output_location / self.file / directory / latest_file
)
def main() -> None:
with Session() as session:
for seeker_t in (
HouseHoldMakeupSeeker,
HouseHoldAgesSeeker,
HouseHoldIncomeSeeker,
HouseHoldIncomeSeekerScotland,
AddressHierarchySeekerV2,
):
loc = Path(seeker_t.__name__)
loc.mkdir(exist_ok=True)
seeker = seeker_t(session=session, output_location=loc)
seeker.download_from_url()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment