Skip to content

Instantly share code, notes, and snippets.

@Thisisnotdalton
Last active October 9, 2022 08:11
Show Gist options
  • Save Thisisnotdalton/fefb4698df468651b83349b8dc578d73 to your computer and use it in GitHub Desktop.
Save Thisisnotdalton/fefb4698df468651b83349b8dc578d73 to your computer and use it in GitHub Desktop.
Examples of using Python 3.8+ shared_memory for working with pandas DataFrames and geopandas GeoDataFrames.
import pickle
import typing
from multiprocessing.shared_memory import SharedMemory, ShareableList
import numpy as np
import pandas as pd
import geopandas as gpd
def create_shared_memory_of_size(shared_memory_name: str, minimum_size_bytes: int) -> SharedMemory:
try:
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes, create=True)
except FileExistsError:
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes)
if buffer.size < minimum_size_bytes:
buffer.close()
buffer.unlink()
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes, create=True)
return buffer
def share_df(df: pd.DataFrame, df_id: str, extra_meta: dict = None):
df_columns = list(sorted(df.columns))
indexed_df = df.reset_index()
index_name = list(set(indexed_df.columns).difference(df.columns))[0]
df_columns.insert(0, index_name)
length = len(df)
column_memory_buffer_id = f'{df_id}_columns'
meta = dict(column_memory_buffer_id=column_memory_buffer_id, columns=len(df_columns), length=length)
if isinstance(extra_meta, dict):
meta.update(extra_meta)
column_data = []
buffer_boundaries = {}
buffer_size = 0
string_mappings = {}
for i, column in enumerate(df_columns):
if indexed_df[column].dtype == object:
strings = list(sorted(filter(None, set(indexed_df[column].unique()))))
strings.append(None)
string_to_index = {k: i for i, k in enumerate(strings)}
encoded_values = indexed_df[column].apply(lambda _x: string_to_index.get(_x, -1))
missing_values = indexed_df.loc[encoded_values == -1, column]
if len(missing_values) > 0:
print(f'Missing values: {missing_values}!')
indexed_df[column] = encoded_values.astype('float')
string_mappings[column] = strings
column_dtype = indexed_df[column].dtype
column_size_bytes = indexed_df[column].nbytes
buffer_end = buffer_size + column_size_bytes
buffer_boundaries[column] = buffer_size, buffer_end
column_data.extend([column, pickle.dumps(column_dtype), buffer_size])
buffer_size = buffer_end + 1
if len(string_mappings) > 0:
strings_data = pickle.dumps(string_mappings)
meta['string_mappings_buffer_id'] = f'{df_id}_string_columns'
strings_shared_memory = create_shared_memory_of_size(meta['string_mappings_buffer_id'], minimum_size_bytes=len(strings_data))
strings_shared_memory.buf[:len(strings_data)] = strings_data
shared_buffer = create_shared_memory_of_size(column_memory_buffer_id, minimum_size_bytes=buffer_size)
for column in df_columns:
column_values_np = np.ndarray((length,), dtype=indexed_df[column].dtype, buffer=shared_buffer.buf, offset=buffer_boundaries[column][0])
column_values_np[:] = indexed_df[column].to_numpy()
meta_list = [len(meta)]
for k, v in meta.items():
meta_list.extend([k, v])
meta_list.extend(column_data)
try:
meta_shared_list = ShareableList(name=df_id, sequence=meta_list)
except FileExistsError as e:
meta_shared_list = ShareableList(name=df_id)
meta_shared_list.shm.close()
meta_shared_list.shm.unlink()
meta_shared_list = ShareableList(name=df_id, sequence=meta_list)
return df_id
def share_gdf(gdf: gpd.GeoDataFrame, df_id: str):
df = pd.DataFrame(gdf)
extra_meta = dict()
geometry_columns = []
if df.geometry is not None:
geometry_columns.append(gdf.geometry.name)
for column in gdf.columns:
if isinstance(gdf[column], gpd.GeoSeries) and column not in geometry_columns:
geometry_columns.append(column)
if len(geometry_columns) > 0:
for column in geometry_columns:
df[column] = gdf[column].to_wkt()
geometry_columns = [(column, gdf[column].crs) for column in geometry_columns]
extra_meta.update(geometry_columns=pickle.dumps(geometry_columns))
return share_df(df, df_id, extra_meta=extra_meta)
def share_table(df: pd.DataFrame, df_id: str):
if isinstance(df, gpd.GeoDataFrame):
return share_gdf(df, df_id)
return share_df(df, df_id)
def parse_table_meta(df_id: str) -> dict:
meta_list = list(ShareableList(name=df_id))
meta_header_length = meta_list.pop(0)
meta = {}
for i in range(meta_header_length):
k, v = meta_list[2 * i:2 * (i + 1)]
meta[k] = v
column_meta = meta_list[meta_header_length * 2:]
column_entry_length = 3
column_dtypes = {}
columns = []
for i in range(meta['columns']):
column_name, column_dtype_pickle_string, buffer_offset = column_meta[column_entry_length * i: column_entry_length * (i + 1)]
columns.append(column_name)
column_dtype = pickle.loads(column_dtype_pickle_string)
column_dtypes[column_name] = dict(dtype=column_dtype, offset=buffer_offset)
meta['columns'] = columns
meta['column_dtypes'] = column_dtypes
string_mappings = meta.get('string_mappings_buffer_id')
if isinstance(string_mappings, str):
string_mappings_data = SharedMemory(name=string_mappings, create=False)
meta['string_mappings'] = pickle.loads(bytearray(string_mappings_data.buf))
return meta
def access_shared_df(df_id: str) -> typing.Tuple[pd.DataFrame, dict]:
meta = parse_table_meta(df_id)
column_data_buffer = SharedMemory(name=meta['column_memory_buffer_id'])
columns = {}
for column_name, column_meta in meta['column_dtypes'].items():
columns[column_name] = np.ndarray(shape=(meta['length'],), dtype=column_meta['dtype'], buffer=column_data_buffer.buf, offset=column_meta['offset'])
string_mappings = meta.get('string_mappings')
df = pd.DataFrame(columns)
if isinstance(string_mappings, dict):
for column_name, words in string_mappings.items():
assert isinstance(words, list)
df[column_name] = df[column_name].apply(lambda _x: words[-1 if (_x is None or _x == np.nan) else int(_x)])
df = df.set_index(meta['columns'][0], drop=True)
return df, meta
def access_shared_gdf(df_id: str) -> typing.Tuple[pd.DataFrame, dict]:
df, meta = access_shared_df(df_id)
geometry_columns = meta.get('geometry_columns')
if geometry_columns:
geometry_columns = pickle.loads(geometry_columns)
meta['geometry_columns'] = geometry_columns
for column, crs in geometry_columns:
df[column] = gpd.GeoSeries.from_wkt(df[column], crs=crs)
df = gpd.GeoDataFrame(df, geometry=geometry_columns[0][0])
return df, meta
def access_shared_table(df_id: str) -> typing.Tuple[pd.DataFrame, dict]:
return access_shared_gdf(df_id)
def close_shared_table(df_id: str, unlink: bool = False):
meta = parse_table_meta(df_id)
shared_memory_buffers = list(map(lambda _x: SharedMemory(name=_x, create=False), filter(None, map(meta.get, ('column_memory_buffer_id', 'string_mappings_buffer_id')))))
shared_memory_buffers.append(ShareableList(name=df_id).shm)
for buffer in shared_memory_buffers:
buffer.close()
if unlink:
for buffer in shared_memory_buffers:
buffer.unlink()
def dispose_shared_table(df_id: str):
close_shared_table(df_id, unlink=True)
def main():
tests = {
f'gdf_{i}': gpd.read_file(gpd.datasets.get_path(data_set))
for i, data_set in enumerate(gpd.datasets.available)
}
df1 = pd.DataFrame()
df1_columns = ['a', 'b']
rows = 10000
for i, column in enumerate(df1_columns):
df1[f'{column}_int'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='i4')
df1[f'{column}_long'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='i8')
df1[f'{column}_float'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='f4')
df1[f'{column}_double'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='f8')
df1[f'{column}_string'] = np.array(list(map(lambda _x: str(_x * (i + 1)) * (1 + (i % 10)), range(rows))), dtype='f8')
for column in ('a_string', 'a_float', 'a_double'):
df1.loc[df1.index.to_series() % 3 == 0, column] = None
df1.loc[df1.index.to_series() % 3 == 1, column] = np.nan
tests['test_df1'] = df1
shared_memory_objects = []
for table_id, table_df in tests.items():
meta = share_table(table_df, table_id)
shared_memory_objects.append(table_id)
shared_df, meta = access_shared_table(table_id)
assert isinstance(shared_df, pd.DataFrame)
assert len(shared_df) == len(table_df)
assert set(shared_df.columns) == set(table_df.columns), f'Mismatched columns: {set(shared_df.columns)} != {set(table_df.columns)}'
for column in table_df.columns:
if isinstance(shared_df[column], gpd.GeoSeries):
geometries1 = shared_df[column].to_wkt()
geometries2 = table_df[column].to_wkt()
assert geometries1.equals(geometries2), f'Column {column} does not match: {geometries1} != {geometries2}'
else:
assert shared_df[column].equals(table_df[column]), f'Column {column} does not match: {shared_df[column]} != {table_df[column]}'
for table_id in tests:
dispose_shared_table(table_id)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment