Skip to content

Instantly share code, notes, and snippets.

@owahltinez
Last active September 25, 2023 12:48
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save owahltinez/e1df3c41f0c90c54fe3546c5118ca019 to your computer and use it in GitHub Desktop.
Save owahltinez/e1df3c41f0c90c54fe3546c5118ca019 to your computer and use it in GitHub Desktop.
open-buildings-dataset.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/owahltinez/e1df3c41f0c90c54fe3546c5118ca019/open-buildings-dataset.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"source": [
"#@title Install dependencies\n",
"!pip install -q apache-beam s2sphere"
],
"metadata": {
"id": "pczSiVucwWeB"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#@title Downloading Compressed CSV files\n",
"\n",
"import concurrent.futures\n",
"import functools\n",
"import io\n",
"import pandas as pd\n",
"import tensorflow as tf\n",
"from tqdm.notebook import tqdm\n",
"\n",
"def read_pandas_csv(url, **read_opts):\n",
" # This method is significantly faster for reading files stored in GCS.\n",
" with tf.io.gfile.GFile(url, mode='rb') as f:\n",
" return pd.read_csv(io.BytesIO(f.read()), **read_opts)\n",
" \n",
"# Get all S2 cell tokens that contain buildings data.\n",
"# NOTE: Reading files directly from GCS is faster than the http REST endpoint.\n",
"url_root = \"gs://open-buildings-data/v2\"\n",
"# url_root = \"https://storage.googleapis.com/open-buildings-data/v2\"\n",
"tokens = read_pandas_csv(f\"{url_root}/score_thresholds_s2_level_4.csv\").s2_token\n",
"\n",
"# The polygon type can be \"points\" (centroid) or \"polygon\" (footprint).\n",
"poly_type = \"points\" #@param [\"points\", \"polygons\"]\n",
" \n",
"# Create a list with all URLs that we must download data from.\n",
"fnames = [f\"{token}_buildings.csv.gz\" for token in tokens]\n",
"poly_path = f\"{url_root}/{poly_type}_s2_level_4_gzip\"\n",
"urls = [f\"{poly_path}/{fname}\" for fname in fnames]\n",
" \n",
"# Create a function that reads only a subset of fields given a URL.\n",
"columns = [\"latitude\", \"longitude\", \"confidence\"]\n",
"read_opts = dict(usecols=columns, compression='gzip')\n",
"map_func = functools.partial(read_pandas_csv, **read_opts)\n",
"\n",
"with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:\n",
" futures = [executor.submit(map_func, url) for url in urls]\n",
" completed = tqdm(concurrent.futures.as_completed(futures), total=len(futures))\n",
" table_iter = (future.result() for future in completed)\n",
" df = pd.concat(table_iter, copy=False, ignore_index=True)"
],
"metadata": {
"id": "eHEtjvHYm-Yp"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "K0boHNi-_Iff"
},
"outputs": [],
"source": [
"#@title Downloading Earth Engine FeatureCollection\n",
"\n",
"import ee\n",
"\n",
"# This only needs to be done once in your script.\n",
"ee.Authenticate()\n",
"ee.Initialize()\n",
" \n",
"# Read the building polygons feature collection as-is.\n",
"buildings = ee.FeatureCollection('GOOGLE/Research/open-buildings/v2/polygons')\n",
"\n",
"# Download the first 10 and display them.\n",
"buildings.limit(10).getInfo()"
]
},
{
"cell_type": "code",
"source": [
"#@title Random Sampling Using Pandas\n",
"\n",
"# DataFrame containing all data in-memory.\n",
"# df: pd.DataFrame = ...\n",
"\n",
"# Sample random elements from the dataset.\n",
"sample_size = 100_000\n",
"sample = df.sample(sample_size)"
],
"metadata": {
"id": "Xrp-Yyh0jNaK"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#@title Addressing geospatial bias\n",
"\n",
"import pandas as pd\n",
"from geopy.distance import great_circle\n",
" \n",
"# Read all buildings within the S2 cell with token 0ef.\n",
"url_root = \"gs://open-buildings-data/v2\"\n",
"poly_path = f\"{url_root}/points_s2_level_4_gzip\"\n",
"df = read_pandas_csv(f\"{poly_path}/0ef_buildings.csv.gz\", compression=\"gzip\")\n",
"\n",
"# Helper function to extract latitude and longitude from building objects.\n",
"get_lat_lng = lambda x: (x[\"latitude\"], x[\"longitude\"])\n",
" \n",
"selection = []\n",
"sample_size = 1_000\n",
"threshold_meters = 500\n",
" \n",
"# Draw random samples until we have 1,000 buildings at least 500m apart.\n",
"# `DataFrame.sample(frac=1)` is a shortcut used to shuffle the dataset.\n",
"# NOTE: Don't do this unless you need very few samples.\n",
"for _, building in df.sample(frac=1).iterrows():\n",
" latlng = get_lat_lng(building)\n",
" # As soon as any distance does not meet the threshold, stop computing them.\n",
" distance_func = lambda x: great_circle(latlng, get_lat_lng(x)).meters\n",
" if all(distance_func(x) >= threshold_meters for x in selection):\n",
" selection.append(building.to_dict())\n",
" if len(selection) >= sample_size: break"
],
"metadata": {
"id": "AmIXoR7a7G7n"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#@title Using geospatial hashing for sampling\n",
"\n",
"import s2sphere as s2\n",
" \n",
"# df: pd.DataFrame = ...\n",
"# get_lat_lng = lambda building: ...\n",
" \n",
"def s2_cell_at_lat_lnt(lat, lng, level=30):\n",
" \"\"\"Helper function to retrieve S2 cell of <level> at <lat,lng> coordinates.\"\"\"\n",
" latlng = s2.LatLng.from_degrees(lat, lng)\n",
" return s2.CellId.from_lat_lng(latlng).parent(level)\n",
" \n",
"cell_level = 14\n",
"sample_size = 1_000\n",
" \n",
"selection = []\n",
"cell_tokens = set()\n",
" \n",
"# Iterate over all (shuffled) buildings and add them to the selection if there\n",
"# are no hash collisions.\n",
"for _, building in df.sample(frac=1).iterrows():\n",
" latlng = get_lat_lng(building)\n",
" # Get the S2 cell token corresponding to <lat, lng>, which is fast to compute.\n",
" token = s2_cell_at_lat_lnt(*latlng, level=cell_level).to_token()\n",
" if token not in cell_tokens:\n",
" cell_tokens.add(token)\n",
" selection.append(building.to_dict())\n",
" if len(selection) >= sample_size: break"
],
"metadata": {
"id": "_FtGb1kM7jXV"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#@title Scaling the processing Apache Beam\n",
"\n",
"import apache_beam as beam\n",
"\n",
"# Get all S2 cell tokens that contain buildings data.\n",
"# NOTE: Reading files directly from GCS is faster than the http REST endpoint.\n",
"url_root = \"gs://open-buildings-data/v2\"\n",
"# url_root = \"https://storage.googleapis.com/open-buildings-data/v2\"\n",
"tokens = read_pandas_csv(f\"{url_root}/score_thresholds_s2_level_4.csv\").s2_token\n",
" \n",
"# The polygon type can be \"points\" (centroid) or \"polygon\" (footprint).\n",
"poly_type = \"points\" #@param [\"points\", \"polygons\"]\n",
" \n",
"# Create a list with all URLs that we must download data from.\n",
"fnames = [f\"{token}_buildings.csv.gz\" for token in tokens]\n",
"poly_path = f\"{url_root}/{poly_type}_s2_level_4_gzip\"\n",
"urls = [f\"{poly_path}/{fname}\" for fname in fnames]\n",
" \n",
"# Save the header columns, which are shared across all clusters.\n",
"columns = read_pandas_csv(urls[0], compression=\"gzip\", nrows=0).columns\n",
"\n",
"def process_token_url(url, columns=None):\n",
" \n",
" cell_level = 14\n",
" sample_size = 1_000\n",
"\n",
" sample_count = 0\n",
" cell_tokens = set()\n",
"\n",
" # Iterate over all (shuffled) buildings and return them if there are\n",
" # no hash collisions.\n",
" df = read_pandas_csv(url, compression=\"gzip\")\n",
" for _, building in df.sample(frac=1).iterrows():\n",
" latlng = building[\"latitude\"], building[\"longitude\"]\n",
" # Get the S2 cell token corresponding to <lat, lng>.\n",
" token = s2_cell_at_lat_lnt(*latlng, level=cell_level)\n",
" if token not in cell_tokens:\n",
" sample_count += 1\n",
" cell_tokens.add(token)\n",
" # Return the building as a TSV line.\n",
" yield \"\\t\".join(str(building[col]) for col in columns)\n",
" # Once we reach our desired sample size, we can stop.\n",
" if sample_count >= sample_size: break\n",
"\n",
"# These options are used for testing purposes and work on Colab. To take\n",
"# advantage of parallel processing, you should adjust them to your resources.\n",
"opts = dict(direct_running_mode='multi_threading', direct_num_workers=8)\n",
"with beam.Pipeline(options=beam.pipeline.PipelineOptions(**opts)) as pipeline:\n",
" _ = (\n",
" pipeline\n",
" | beam.Create(urls)\n",
" | beam.FlatMap(process_token_url, columns=columns)\n",
" | beam.io.WriteToText(\"output/data\", \".tsv\", header=\"\\t\".join(columns))\n",
" )"
],
"metadata": {
"id": "8FiRAghN73tT"
},
"execution_count": null,
"outputs": []
}
],
"metadata": {
"colab": {
"provenance": [],
"collapsed_sections": [],
"machine_shape": "hm",
"authorship_tag": "ABX9TyNdHqHQdUqrLV8tw3Tnk68N",
"include_colab_link": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment