Skip to content

Instantly share code, notes, and snippets.

@jwilson8767
Created February 19, 2019 19:05
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jwilson8767/62151d486a8c6dbf899854ddaff94468 to your computer and use it in GitHub Desktop.
Save jwilson8767/62151d486a8c6dbf899854ddaff94468 to your computer and use it in GitHub Desktop.
Geopandas concurrent sjoin
from concurrent.futures import as_completed, ProcessPoolExecutor
import geopandas as gpd
import pandas as pd
import numpy as np
from collections.abc import Sequence
from shapely import prepared
def sjoin(left_df, right_df, op='intersects', how='inner', lsuffix='left', rsuffix='right', fail_crs_mismatch: bool = True, fail_missing_geometries: bool = False) -> gpd.GeoDataFrame:
"""Spatial join of two GeoDataFrames. GeoPandas sjoin with concurrency (split naively using df slicing).
Parameters
----------
:param left_df: GeoDataFrame
:param right_df: GeoDataFrame
:param op: string, default 'intersection'
Binary predicate, one of {'intersects', 'contains', 'within'}.
See http://toblerity.org/shapely/manual.html#binary-predicates.
:param how: string, default 'inner'
The type of join:
* 'left': use keys from left_df; retain only left_df geometry column
* 'right': use keys from right_df; retain only right_df geometry column
* 'inner': use intersection of keys from both dfs; retain geometry column according to retain_geometry.
:param lsuffix: string, default 'left'
Suffix to apply to overlapping column names (left GeoDataFrame).
:param rsuffix: string, default 'right'
Suffix to apply to overlapping column names (right GeoDataFrame).
:param inner_index: string, default 'left'
Which index to retain for inner joins, one of {'left', 'right', 'both'}.
:param hide_progress: bool suppresses progress bars
:param fail_crs_mismatch: Whether to throw an exception when crs does not match exactly.
:param fail_missing_geometries: Whether to throw an exception when there are missing geometries.
"""
_validate('how', how)
_validate('op', op)
if left_df.empty:
raise ValueError("`left_df` is empty. Cannot perform spatial join.")
if right_df.empty:
raise ValueError("`right_df` is empty. Cannot perform spatial join.")
if fail_missing_geometries:
if pd.isnull(left_df.geometry).any():
raise ValueError('`left_df` has missing geometries.')
if pd.isnull(right_df.geometry).any():
raise ValueError('`right_df` has missing geometries.')
if left_df.crs != right_df.crs:
if fail_crs_mismatch:
raise AttributeError('CRS does not match! left_df: {}, right_df: {}'.format(left_df.crs, right_df.crs))
else:
print('Warning: CRS does not match!')
index_left = 'index_%s' % lsuffix
index_right = 'index_%s' % rsuffix
# due to GH 352
if (any(left_df.columns.isin([index_left, index_right]))
or any(right_df.columns.isin([index_left, index_right]))):
raise ValueError("'{0}' and '{1}' cannot be names in the frames being"
" joined".format(index_left, index_right))
# Attempt to re-use spatial indexes, otherwise generate the spatial index for the longer dataframe
if right_df._sindex_generated or (not left_df._sindex_generated and right_df.shape[0] > left_df.shape[0]):
tree_idx = right_df.sindex
tree_idx_right = True
else:
tree_idx = left_df.sindex
tree_idx_right = False
# the rtree spatial index only allows limited (numeric) index types, but an
# index in geopandas may be any arbitrary dtype. so reset both indices now
# and store references to the original indices, to be reaffixed later.
# GH 352
left_df = left_df.copy(deep=True)
left_df.index = left_df.index.rename(index_left)
left_df = left_df.reset_index()
right_df = right_df.copy(deep=True)
right_df.index = right_df.index.rename(index_right)
right_df = right_df.reset_index()
r_idx = np.empty([0, 0])
l_idx = np.empty([0, 0])
# get rtree spatial index
if tree_idx_right:
idxmatch = (left_df.geometry.apply(lambda a: a.bounds)
.apply(lambda a: list(tree_idx.intersection(a))))
idxmatch = idxmatch[idxmatch.apply(len) > 0]
# indexes of overlapping boundaries
if idxmatch.shape[0] > 0:
r_idx = np.concatenate(idxmatch.values)
l_idx = np.concatenate([[i] * len(v) for i, v in idxmatch.iteritems()])
else: # tree_idx_df == 'left':
idxmatch = (right_df.geometry.apply(lambda a: a.bounds)
.apply(lambda x: list(tree_idx.intersection(x))))
idxmatch = idxmatch[idxmatch.apply(len) > 0]
if idxmatch.shape[0] > 0:
# indexes of overlapping boundaries
l_idx = np.concatenate(idxmatch.values)
r_idx = np.concatenate([[i] * len(v) for i, v in idxmatch.iteritems()])
if len(r_idx) > 0 and len(l_idx) > 0:
matches = (
pd.DataFrame(
np.column_stack(
[
left_df[left_df.geometry.name][l_idx],
right_df[right_df.geometry.name][r_idx]
]))
)
sjoined = concurrent_map(fn=Op(op), iterable=matches)
result = (
pd.DataFrame(
np.column_stack(
[l_idx,
r_idx,
sjoined
])
)
)
result.columns = ['_key_left', '_key_right', 'match_bool']
result = pd.DataFrame(result[result['match_bool'] == 1]).drop('match_bool', axis=1)
else:
# when output from the join has no overlapping geometries
result = pd.DataFrame(columns=['_key_left', '_key_right'], dtype=float)
if how == 'inner':
result = result.set_index('_key_left')
joined = (
gpd.GeoDataFrame(
left_df
.merge(result, left_index=True, right_index=True)
.merge(right_df,
left_on='_key_right', right_index=True,
suffixes=('_%s' % lsuffix, '_%s' % rsuffix)),
crs=left_df.crs)
)
joined = joined.rename(columns={'geometry_%s' % lsuffix: 'geometry'})
del joined['geometry_%s' % rsuffix]
joined = joined.set_index(index_left).drop(['_key_right'], axis=1) # type: gpd.GeoDataFrame
joined.index.name = None
elif how == 'left':
result = result.set_index('_key_left')
joined = (
left_df
.merge(result, left_index=True, right_index=True, how='left')
.merge(right_df.drop(right_df.geometry.name, axis=1),
how='left', left_on='_key_right', right_index=True,
suffixes=('_%s' % lsuffix, '_%s' % rsuffix))
)
joined = joined.set_index(index_left).drop(['_key_right'], axis=1)
joined.index.name = None
else: # how == 'right':
joined = (
left_df
.drop(left_df.geometry.name, axis=1)
.merge(result.merge(right_df,
left_on='_key_right', right_index=True,
how='right'), left_index=True,
right_on='_key_left', how='right')
.set_index(index_right)
)
joined = joined.drop(['_key_left', '_key_right'], axis=1)
return joined
def concurrent_map(fn, iterable, chunk_size=256, **kwargs):
"""
Concurrently maps a function on a slice-able iterable with keyword arguments passed along.
:param fn: the function to be called on the iterables.
:param iterable: the list, NumPy array, DataFrame to call the function on.
:param chunk_size: splits the iterable into smaller chunks of this size or less.
:param kwargs: keyword arguments passed to the functions.
:return: the concatenated results of calling the function on the iterable.
"""
number_splits = (len(iterable) + chunk_size - 1) // chunk_size
splits = [x * chunk_size for x in range(number_splits)]
chunks = [iterable[skip:skip + chunk_size] for skip in splits]
chunks = [chunk.copy() for chunk in chunks]
with ProcessPoolExecutor() as executor:
try:
futures = [executor.submit(fn, args, **kwargs) for args in chunks]
for f in as_completed(futures):
# bubble up exceptions from workers
if f.exception() is not None:
raise f.exception()
continue
except Exception as ex:
executor.shutdown(wait=True)
raise ex
results = [future.result() for future in futures]
if len(results) > 0:
if isinstance(results[0], pd.core.generic.NDFrame):
return pd.concat(results)
elif isinstance(results[0], np.ndarray):
return np.concatenate(results)
merged = []
for chunk in results:
if isinstance(chunk, Sequence):
merged += chunk
return merged
sjoin_parameters = {
'how': ['left', 'right', 'inner'],
'op': ['intersects', 'within', 'contains']
}
def _validate(name, value):
if value not in sjoin_parameters[name]:
raise ValueError('{} was \'{}\' but is expected to be one of {}.'.format(name, value, sjoin_parameters[name]))
class Op:
def __init__(self, func):
self.func = func
def __call__(self, df):
return np.vectorize(globals()['_'+self.func])(df[0].apply(prepared.prep), df[1])
def _intersects(a1, a2):
return a1.intersects(a2)
def _contains(a1, a2):
return a1.contains(a2)
def _within(a1, a2):
return a1.within(a2)
if __name__ == '__main__':
a = gpd.read_file('a.shp')
b = gpd.read_file('b.shp')
print(a.shape)
print(b.shape)
c = sjoin(a, b, how='left')
print(c.shape)
@dlhvelasco
Copy link

Hi again, thank you for taking the time to answer my questions!
I was hoping to import this sjoin.py into my PointsPerPolygon.py so I could use the faster sjoin on two geodataframes: one called "points" and another "polygons".
However when I run PointsPerPolygon.py and calling sjoin.sjoin(points, polygons, how='left', op='within') in it, I am not getting any output.
I am also seeing "MainThread", "QueueManagerThread", and "QueueFeederThread" in my call stack, if that means anything.
How should I go about using your sjoin externally? Thanks again.

@jwilson8767
Copy link
Author

So let's check a few things:

  • If you run it with op='intersects' do you get a result?
  • Run the example at the bottom and post your results (specifically c.shape)
  • Please confirm you are using Python 3.6+ as I'm not sure concurrent.futures work the same prior to 3.6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment