Skip to content

Instantly share code, notes, and snippets.

@cholmes
Last active June 8, 2023 16:01
Show Gist options
  • Save cholmes/d9ad24efe53fa5e094055ffb3e41979c to your computer and use it in GitHub Desktop.
Save cholmes/d9ad24efe53fa5e094055ffb3e41979c to your computer and use it in GitHub Desktop.
import sqlalchemy
import geopandas as gpd
import dask_geopandas as dgd
import pandas as pd
import dask.dataframe
from shapely import wkb
import time
TOTAL_ROWS = 150000000
CHUNKSIZE = 2500000
params = {
'database': 'buildings',
'user': 'cholmes',
'password': 'your_password', # Replace with your actual password
'host': 'localhost',
'port': '5432', # Default Postgres port, adjust if needed
}
def load_chunk(engine, offset):
# Execute the query to fetch the chunk
chunk = pd.read_sql_query(
f"""
SELECT id, ST_AsEWKB(geometry) as geometry
FROM buildings_unique
LIMIT {CHUNKSIZE} OFFSET {offset};
""",
engine
)
# Convert the WKB geometry to a GeoSeries
geoms = chunk['geometry'].apply(bytes).apply(wkb.loads)
chunk['geometry'] = gpd.GeoSeries(geoms)
return dgd.from_geopandas(chunk, npartitions=1)
def main():
# Create the database connection engine
engine = sqlalchemy.create_engine(f"postgresql://{params['user']}:{params['password']}@{params['host']}:{params['port']}/{params['database']}")
ddf = dgd.from_geopandas(gpd.GeoDataFrame(), npartitions=int(TOTAL_ROWS / CHUNKSIZE))
print("Starting processing at " + str(pd.Timestamp.now()))
for offset in range(0, TOTAL_ROWS, CHUNKSIZE):
chunk = pd.read_sql_query(
f"""
SELECT id, ST_AsEWKB(geometry) as geometry
FROM buildings_unique
LIMIT {CHUNKSIZE} OFFSET {offset};
""",
engine
)
# Convert the WKB geometry to a GeoSeries
geoms = chunk['geometry'].apply(bytes).apply(wkb.loads)
chunk['geometry'] = gpd.GeoSeries(geoms)
print(f"Time: {time.ctime()}, Rows Processed: {min(offset + CHUNKSIZE, TOTAL_ROWS)}")
chunk_dgd = dgd.from_geopandas(chunk, npartitions=1)
ddf = dask.dataframe.concat([ddf, chunk_dgd])
geometry = ddf.geometry.map_partitions(
gpd.GeoSeries, meta=gpd.GeoSeries([])
)
gdf = dgd.from_dask_dataframe(ddf, geometry=geometry).set_crs("EPSG:4326")
# Once the chunks have all been loaded and appended, write the result to parquet
gdf.to_parquet('/Users/cholmes/Downloads/geo-data/google-buildings.parquet')
print("Finished processing at " + str(pd.Timestamp.now()))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment