Skip to content

Instantly share code, notes, and snippets.

@kuanb
Created April 14, 2017 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kuanb/44c0ee2fac953a9f6a6e4ff3f8ba5ada to your computer and use it in GitHub Desktop.
Save kuanb/44c0ee2fac953a9f6a6e4ff3f8ba5ada to your computer and use it in GitHub Desktop.
Example of Dask variation of GeoDataFrame buffer aggregation
import sys, csv, time
from dask import dataframe as dd
import geopandas as gpd
import pandas as pd
from shapely.wkt import loads
# log/monitor performance
start_time = time.time()
def log(message):
global start_time
print('{:,.3f}s: {}'.format(time.time()-start_time, message))
start_time = time.time()
# load in example geometry data
filepath = './example_geometries.csv'
init_df = pd.read_csv(filepath)
length_options = [500, 1000, 5000, 10000, 20000]
for subset_length in length_options:
# take a subset of the dataframe per increment
print('\nRunning on subset length: ' + str(subset_length) + ' rows')
df = init_df.head(subset_length)
# =====================
# First steps: prepare the GeoDataFrame
# =====================
# create a geometry geoseries from the geometry column's wkt string
crs = {'init': 'epsg:32154'}
geometries = gpd.GeoSeries(df['geometry'].map(lambda x: loads(x)))
# take a subset of the pandas df and convert to a gdf via geopandas
gdf = gpd.GeoDataFrame(data=df[['id', 'value']], crs=crs, geometry=geometries)
# ...now index the new gdf (id is integer dtype)
gdf = gdf.set_index('id')
# we'll do an aggregation based off of an arbitrary buffer distance
gdf['buffer'] = gdf['geometry'].buffer(5000)
print('\tFinished initial set up steps. Running Dask operation...')
# we can convert from a geodataframe to dask
dd_gdf = dd.from_pandas(gdf, npartitions=3)
# =====================
# Next: define and apply the lambda function
# =====================
# update geometries global to preserve same index
geometries = gdf.geometry
sidx = gdf.sindex
# we want to update this column via summarize operation
dd_gdf['sum'] = 0.0
# this is the summary/aggregation process we'll perform on each row
def summarize(row, sidx, dgdf, usecompute):
# we use dgdf from global scope
buffer = row['buffer']
# utilizing precalc'd spatial indices
possible_matches_index = list(sidx.intersection(buffer.bounds))
possible_matches = dgdf.iloc[possible_matches_index]
where_intersects = possible_matches.intersects(buffer)
dgdf['intersects'] = where_intersects
precise_matches = dgdf[dgdf['intersects'] == True]
summed = precise_matches['value'].sum()
if usecompute is True:
pass
# summed = summed.compute()
row['sum'] = summed
return row
meta = dd_gdf.loc[5758].compute()
usecompute = True
dd_sums_op = dd_gdf.apply(lambda row: summarize(row, sidx, gdf.copy(), usecompute), axis=1, meta=meta)
dd_sums = dd_sums_op.compute()
log('Finished Dask run.')
# print(dd_sums[['value', 'sum']].head())
print('\tRunning original operation...')
usecompute = False
gdf.apply(lambda row: summarize(row, sidx, gdf.copy(), usecompute), axis=1)
log('Finished normal method run.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment