Skip to content

Instantly share code, notes, and snippets.

@AKST
Created July 21, 2024 07:54
Show Gist options
  • Save AKST/8e163edf533acbd6ac3c3b44686d39df to your computer and use it in GitHub Desktop.
Save AKST/8e163edf533acbd6ac3c3b44686d39df to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "79a3b7f3-a2ce-4fe6-b487-cf18eadd49af",
"metadata": {},
"source": [
"# GNAF ingestion\n",
"\n",
"This jupyter noteebook ingests the [Geocoded National Address File][gnaf] ([GNAF][gnaf]) from [data.gov.au](data.gov.au). It loads it into a PostgreSQL database in a docker container, treating it like a disposable sqlite data store.\n",
"\n",
"## Note\n",
"\n",
"- Make sure docker is running first.\n",
"\n",
"## About\n",
"\n",
"### Overview\n",
"\n",
"1. Download compact assets from [data.gov.au](data.gov.au)\n",
"2. Create docker container `temp_gnaf_db` from `postgres:latest` image.\n",
"3. Create database from schema in data.gov.au assets.\n",
"\n",
"### Warning\n",
"\n",
"Do not connect this to another database unless you've taken the time to update this, as it'll drop the existing database. I suggest instead take what you need from this script and disregard the rest. DO NOT USE DATABASE CREDENTIALS HERE FOR ANY OTHER STORE (especailly anything with drop permissions).\n",
"\n",
"It also executes sql from a zip file downloaded from an external source.\n",
"\n",
"[gnaf]: https://data.gov.au/data/dataset/geocoded-national-address-file-g-naf\n",
"\n",
"## Download resources\n",
"\n",
"First lets download the compressed zip and unpack into a directory which we'll later read from. This will contain SQL for the database schema as well as data in pipe seperated files (`.psv`, like a `csv` with `|` in it)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "320cd7fe-f03d-4230-9e0f-a2ef302a91b2",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from urllib.request import urlretrieve\n",
"from zipfile import ZipFile, BadZipFile\n",
"\n",
"web_dst = 'web-out/gnaf-2020.zip'\n",
"zip_dst = 'zip-out/gnaf-2020'\n",
"gnaf_2020_url = 'https://data.gov.au/data/dataset/19432f89-dc3a-4ef3-b943-5326ef1dbecc/resource/1c685b96-9297-4b62-888e-c981790d332f/download/g-naf_may24_allstates_gda2020_psv_1015.zip'\n",
"\n",
"# create workspace directories\n",
"for d in ['web-out', 'zip-out', zip_dst]:\n",
" if not os.path.isdir(d):\n",
" os.mkdir(d) \n",
"\n",
"# download if not already downloaded\n",
"if not os.path.isfile(web_dst):\n",
" print(\"downloading zip\")\n",
" urlretrieve(gnaf_2020_url, web_dst)\n",
"else:\n",
" print(\"already downloaded zip\")\n",
"\n",
"# lets check if we've already unzipped it\n",
"with os.scandir(zip_dst) as it:\n",
" for entry in it:\n",
" print(\"zip already extracted\")\n",
" is_zip_dst_empty = False\n",
" break\n",
" else:\n",
" print(\"zip not yet extracted\")\n",
" is_zip_dst_empty = True\n",
"\n",
"if is_zip_dst_empty:\n",
" with ZipFile(web_dst, 'r') as z:\n",
" try:\n",
" print(\"extracting zip\")\n",
" z.extractall(zip_dst)\n",
" except BadZipFile as e:\n",
" print(f'failed to unzip, {web_dst} to {zip_dst} {e}')\n",
" raise e\n"
]
},
{
"cell_type": "markdown",
"id": "7e0638aa-a9dd-4fe9-9ff8-50ef084ac0dd",
"metadata": {},
"source": [
"## Setup Container & Reset Database\n",
"\n",
"This notebook this is designed to be run more than once, so it'll throw away any existing container and database before creating a new one. After getting rid of any container using the same identifer, it'll create a new one and pull the relevant image if it's not already installed. It'll wait till the postgres instance is live then create the database. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4f709233-e243-468b-971e-df23bfcc0113",
"metadata": {},
"outputs": [],
"source": [
"import docker\n",
"import psycopg2\n",
"import subprocess\n",
"import time\n",
"\n",
"from psycopg2 import OperationalError\n",
"\n",
"\n",
"dbname = 'temp_gnaf_db'\n",
"dbconf = { 'user': 'postgres', 'password': 'throw away password', 'host': 'localhost', 'port': 5434 }\n",
"db_url = f\"postgresql+psycopg2://{dbconf['user']}:{dbconf['password']}@{dbconf['host']}:{dbconf['port']}/{dbname}\"\n",
"container_name = 'temp_gnaf_container'\n",
"image_name = 'postgres:latest'\n",
"\n",
"try:\n",
" client = docker.from_env()\n",
"except docker.errors.DockerException as e:\n",
" print(f\"Error initializing Docker client: {e}\")\n",
" raise e\n",
"\n",
"# If it exists, find existing docker container and tear it down,\n",
"# we're starting from scratch so that's okay.\n",
"try:\n",
" container = client.containers.get(container_name)\n",
" container.stop()\n",
" container.remove()\n",
"except docker.errors.NotFound:\n",
" # do nothing is already running\n",
" pass\n",
"\n",
"# Pull the PostgreSQL image\n",
"client.images.pull(image_name)\n",
"\n",
"# Start a new PostgreSQL container\n",
"container = client.containers.run(\n",
" image_name,\n",
" name=container_name,\n",
" environment={'POSTGRES_PASSWORD': dbconf['password']},\n",
" ports={'5432/tcp': dbconf['port']},\n",
" detach=True\n",
")\n",
"\n",
"start_time = time.time()\n",
"timeout=60\n",
"interval=5\n",
"\n",
"# here we are waiting till the DB can connect, the reason\n",
"# for this as it may take some time for the container to\n",
"# start and for postgres to start responding. Otherwise\n",
"# the script will crash next time you call the database.\n",
"while True:\n",
" print(\"polling database\")\n",
" try:\n",
" conn = psycopg2.connect(dbname='postgres', **dbconf)\n",
" conn.close()\n",
" break\n",
" except OperationalError as e:\n",
" if time.time() - start_time > timeout:\n",
" print(\"Failed to connect to the database within the timeout period.\")\n",
" raise e\n",
" time.sleep(interval)\n",
"\n",
"subprocess.run(f\"\"\"\n",
"PGPASSWORD=\"{dbconf['password']}\" psql -U {dbconf['user']} -h {dbconf['host']} -p {dbconf['port']} -c \"DROP DATABASE IF EXISTS {dbname};\"\n",
"PGPASSWORD=\"{dbconf['password']}\" psql -U {dbconf['user']} -h {dbconf['host']} -p {dbconf['port']} -c \"CREATE DATABASE {dbname};\"\n",
"\"\"\", shell=True, check=True)\n",
"\n",
"for script in [\n",
" 'zip-out/gnaf-2020/G-NAF/Extras/GNAF_TableCreation_Scripts/create_tables_ansi.sql',\n",
" 'zip-out/gnaf-2020/G-NAF/Extras/GNAF_TableCreation_Scripts/add_fk_constraints.sql'\n",
"]:\n",
" with psycopg2.connect(dbname=dbname, **dbconf) as conn:\n",
" cursor = conn.cursor()\n",
" with open(script, 'r') as sql_file:\n",
" cursor.execute(sql_file.read())\n",
" cursor.close()"
]
},
{
"cell_type": "markdown",
"id": "e4eb2c4d-537f-4bd4-a95c-916fc71f9a11",
"metadata": {},
"source": [
"## Data Ingestion\n",
"\n",
"The main thing to consider with data ingestion is the order in which it is ingested. Now you could actually add the foreign key constraints after populating the database, and go nuts (That might actually even be faster than what I got here). But after a day of different variants of this script while trying to juggle correctness of data ingested and speed, I'm settling for this.\n",
"\n",
"So 90% of the code here is just coordinating the dependencies and order in which everything is ingested, as well as doing as much in parallel as possible. My earlier approach of doing everything sequentially took 6 hours, is between 1-2 hours. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "166f8e85-bf6c-4ddc-8a8f-946fa2285db1",
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"import datetime\n",
"import glob\n",
"import os\n",
"import psycopg2\n",
"import concurrent.futures\n",
"\n",
"from collections import defaultdict, deque\n",
"from datetime import datetime\n",
"from threading import Lock\n",
"\n",
"WORKER_COUNT = 32\n",
"BATCH_SIZE = 2000\n",
"\n",
"def get_table_name(file):\n",
" file = os.path.splitext(os.path.basename(file))[0]\n",
" sidx = 15 if file.startswith('Authority_Code') else file.find('_')+1\n",
" return file[sidx:file.rfind('_')]\n",
"\n",
"def get_batches(batch_size, reader):\n",
" batch = []\n",
" for row in reader:\n",
" row = [(None if v == \"\" else v) for v in (v.strip() for v in row)]\n",
" batch.append(row)\n",
" \n",
" if len(batch) >= batch_size:\n",
" yield batch\n",
" batch = [] \n",
" if batch:\n",
" yield batch\n",
"\n",
"def populate_file(file):\n",
" table_name = get_table_name(file)\n",
" with psycopg2.connect(dbname=dbname, **dbconf) as conn:\n",
" cursor = conn.cursor()\n",
" with open(file, 'r') as f:\n",
" time = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n",
" print(f\"{time} Populating from {os.path.basename(file)}\")\n",
" reader = csv.reader(f, delimiter='|')\n",
" headers = next(reader)\n",
" insert_query = f\"\"\"\n",
" INSERT INTO {table_name} ({', '.join(headers)}) \n",
" VALUES ({', '.join(['%s'] * len(headers))})\n",
" ON CONFLICT DO NOTHING\n",
" \"\"\"\n",
" \n",
" for batch_index, batch in enumerate(get_batches(BATCH_SIZE, reader)):\n",
" try:\n",
" cursor.executemany(insert_query, batch)\n",
" except Exception as e:\n",
" print(f\"Error inserting batch {batch_index + 1} into {table_name}: {e}\")\n",
" raise e\n",
" conn.commit()\n",
"\n",
"def get_ordered_files(dependencies, all_files):\n",
" file_sizes = { file: os.path.getsize(file) for file in all_files }\n",
" total_blocked_sizes = defaultdict(int)\n",
" \n",
" def dfs(file, visited):\n",
" if file in visited:\n",
" return 0\n",
" visited.add(file)\n",
" total_size = file_sizes[file]\n",
" for dep in dependencies[file]:\n",
" total_size += dfs(dep, visited)\n",
" return total_size\n",
" \n",
" for file in dependencies:\n",
" visited = set()\n",
" total_blocked_sizes[file] = dfs(file, visited)\n",
" \n",
" # Sort files based on the total blocked sizes, with higher sizes first\n",
" return sorted(all_files, key=lambda f: total_blocked_sizes[f], reverse=True)\n",
"\n",
"def worker(file_queue, all_files, dependency_count, lock, dependency_completed):\n",
" while True:\n",
" with lock:\n",
" if not file_queue:\n",
" break\n",
" file = file_queue.pop()\n",
" \n",
" populate_file(file)\n",
" \n",
" with lock:\n",
" dependency_completed.add(file)\n",
" \n",
" for d in dependency_count:\n",
" if file in dependency_count[d]:\n",
" dependency_count[d].remove(file)\n",
" \n",
" ready_files = [f for f in all_files if not dependency_count[f]]\n",
" \n",
" for ready_file in ready_files:\n",
" if ready_file not in file_queue:\n",
" file_queue.append(ready_file)\n",
" all_files.remove(ready_file)\n",
"\n",
"\n",
"authority_files = glob.glob('zip-out/gnaf-2020/G-NAF/G-NAF MAY 2024/Authority Code/*.psv')\n",
"standard_prefix = 'zip-out/gnaf-2020/G-NAF/G-NAF MAY 2024/Standard'\n",
"standard_files = [\n",
" f'{standard_prefix}/{s}_{t}_psv.psv' \n",
" for t in [\n",
" 'STATE', 'ADDRESS_SITE', 'MB_2016', 'MB_2021', 'LOCALITY',\n",
" 'LOCALITY_ALIAS', 'LOCALITY_NEIGHBOUR', 'LOCALITY_POINT',\n",
" 'STREET_LOCALITY', 'STREET_LOCALITY_ALIAS', 'STREET_LOCALITY_POINT',\n",
" 'ADDRESS_DETAIL', 'ADDRESS_SITE_GEOCODE', 'ADDRESS_ALIAS', \n",
" 'ADDRESS_DEFAULT_GEOCODE', 'ADDRESS_FEATURE', \n",
" 'ADDRESS_MESH_BLOCK_2016', 'ADDRESS_MESH_BLOCK_2021',\n",
" 'PRIMARY_SECONDARY',\n",
" ] \n",
" for s in ['NSW', 'VIC', 'QLD', 'WA', 'SA', 'TAS', 'NT', 'OT', 'ACT'] \n",
"]\n",
"\n",
"standard_deps = { f: set() for f in authority_files } | { \n",
" f'{p}/{s}_{t}_psv.psv': { f'{p}/{s}_{d}_psv.psv' for d in ds } | set(authority_files)\n",
" \n",
" for t, ds in ({\n",
" 'STATE': [],\n",
" 'ADDRESS_SITE': ['STATE'],\n",
" 'MB_2016': [],\n",
" 'MB_2021': [],\n",
" 'LOCALITY': ['STATE'],\n",
" 'LOCALITY_ALIAS': ['LOCALITY'],\n",
" 'LOCALITY_NEIGHBOUR': ['LOCALITY'],\n",
" 'LOCALITY_POINT': ['LOCALITY'],\n",
" 'STREET_LOCALITY': ['LOCALITY'],\n",
" 'STREET_LOCALITY_ALIAS': ['STREET_LOCALITY'],\n",
" 'STREET_LOCALITY_POINT': ['STREET_LOCALITY'],\n",
" 'ADDRESS_DETAIL': ['ADDRESS_SITE', 'STATE', 'LOCALITY', 'STREET_LOCALITY'],\n",
" 'ADDRESS_SITE_GEOCODE': ['ADDRESS_SITE'],\n",
" 'ADDRESS_ALIAS': ['ADDRESS_DETAIL'],\n",
" 'ADDRESS_DEFAULT_GEOCODE': ['ADDRESS_DETAIL'],\n",
" 'ADDRESS_FEATURE': ['ADDRESS_DETAIL'],\n",
" 'ADDRESS_MESH_BLOCK_2016': ['ADDRESS_DETAIL', 'MB_2016'],\n",
" 'ADDRESS_MESH_BLOCK_2021': ['ADDRESS_DETAIL', 'MB_2021'],\n",
" 'PRIMARY_SECONDARY': ['ADDRESS_DETAIL'],\n",
" }).items()\n",
" for s in ['NSW', 'VIC', 'QLD', 'WA', 'SA', 'TAS', 'NT', 'OT', 'ACT'] \n",
" for p in [standard_prefix]\n",
"}\n",
"\n",
"lock = Lock()\n",
"all_files = { *authority_files, *standard_files }\n",
"dependency_count = {k: set(v) for k, v in standard_deps.items()}\n",
"dependency_completed = set()\n",
"file_queue = deque()\n",
"file_queue.extend(f for f in get_ordered_files(dependency_count, all_files) if not dependency_count[f])\n",
"\n",
"with concurrent.futures.ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor:\n",
" futures = [executor.submit(worker, file_queue, all_files, dependency_count, lock, dependency_completed) for _ in range(WORKER_COUNT)]\n",
" for future in concurrent.futures.as_completed(futures):\n",
" future.result()\n"
]
},
{
"cell_type": "markdown",
"id": "46ede410-b1b3-494b-a51d-67e74c110753",
"metadata": {},
"source": [
"## Check contents\n",
"\n",
"Lastly lets check the contents, there should be at least something here."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23e887af-b30e-40da-ae8b-ae0c6d779816",
"metadata": {},
"outputs": [],
"source": [
"with psycopg2.connect(dbname=dbname, **dbconf) as conn:\n",
" cursor = conn.cursor()\n",
"\n",
" # Get the list of all tables\n",
" cursor.execute(\"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'\")\n",
" tables = cursor.fetchall()\n",
"\n",
" # Get row count for each table\n",
" for table in tables:\n",
" cursor.execute(f'SELECT COUNT(*) FROM {table[0]}')\n",
" count = cursor.fetchone()[0]\n",
" print(f\"Table {table[0]} has {count} rows\")\n",
"\n",
" cursor.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d33bc0f9-e772-422a-9c7c-0d743f71d8f4",
"metadata": {},
"outputs": [],
"source": [
"container.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90d928d6-f032-4cd9-bc06-a31915dfe6a1",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment