Skip to content

Instantly share code, notes, and snippets.

@austospumanto
Last active June 24, 2019 06:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save austospumanto/63d8e3f3b75626999496977806c712a6 to your computer and use it in GitHub Desktop.
Save austospumanto/63d8e3f3b75626999496977806c712a6 to your computer and use it in GitHub Desktop.
SharedMemoryManager
"""
System/Runtime Requirements:
Python3.7
Linux / Mac
Must pip install:
numpy, pandas
Must build from source:
shared_memory (https://github.com/SleepProgger/py_shared_memory)
NOTE: Make sure to comment out the libraries=["rt"] extensions keyword argument
in setup.py to be able to build on Mac
Must pip install to run the tests:
seaborn
"""
from __future__ import annotations
import dataclasses as dc
import json
import multiprocessing as mp
import time
from typing import Tuple, Optional, List
import numpy as np
import pandas as pd
import shared_memory as sm
NamesT = Optional[List[str]]
class GlobalArrayMetadata:
shmname2shape = dict()
shmname2dtype = dict()
shmname2names = dict()
@classmethod
def set_shape(cls, shmname: str, shape: Tuple[int, ...]) -> None:
cls.shmname2shape[shmname] = shape
@classmethod
def set_dtype(cls, shmname: str, dtype: np.dtype) -> None:
cls.shmname2dtype[shmname] = dtype
@classmethod
def set_names(cls, shmname: str, names: NamesT) -> None:
cls.shmname2names[shmname] = names
@classmethod
def get_shape(cls, shmname: str) -> Tuple[int, ...]:
return cls.shmname2shape[shmname]
@classmethod
def get_dtype(cls, shmname: str) -> np.dtype:
return cls.shmname2dtype[shmname]
@classmethod
def get_names(cls, shmname: str) -> NamesT:
return cls.shmname2names[shmname]
@dc.dataclass(frozen=True)
class SharedDFArrNames:
columns: str
index: str
values: List[str]
dtypes: str
def ensure_vanilla_forking() -> None:
method = mp.get_start_method(allow_none=True)
if method:
assert method == "fork", method
else:
print('Executing `multiprocessing.set_start_method("fork")`')
mp.set_start_method("fork")
class SharedMemoryManager:
def __init__(self, is_child: bool = False):
self.is_child = is_child
self.shmname2shm = dict()
# So the class attrs of `GlobalArrayMetadata` are readable in child procs
ensure_vanilla_forking()
def __enter__(self) -> SharedMemoryManager:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
def shutdown(self):
for shm in self.shmname2shm.values():
shm.close()
if not self.is_child:
shm.unlink()
def SharedMemory(self, name=None, create=False, size=0) -> sm.SharedMemory:
shm = sm.SharedMemory(name=name, create=create, size=size)
self.shmname2shm[shm.name] = shm
return shm
@classmethod
def set_shape(cls, name: str, shape: Tuple[int, ...]) -> None:
GlobalArrayMetadata.set_shape(name, shape)
@classmethod
def set_dtype(cls, name: str, dtype: np.dtype) -> None:
GlobalArrayMetadata.set_dtype(name, dtype)
@classmethod
def set_names(cls, shmname: str, names: NamesT) -> None:
GlobalArrayMetadata.set_names(shmname, names)
@classmethod
def get_shape(cls, name: str) -> Tuple[int, ...]:
return GlobalArrayMetadata.get_shape(name)
@classmethod
def get_dtype(cls, name: str) -> np.dtype:
return GlobalArrayMetadata.get_dtype(name)
@classmethod
def get_names(cls, shmname: str) -> NamesT:
return GlobalArrayMetadata.get_names(shmname)
def get_shared_array_names(self, shmname: str) -> NamesT:
return self.get_names(shmname)
def get_shared_ndarray(self, shmname: str, shape=None, dtype=None) -> np.ndarray:
existing_shm = self.SharedMemory(name=shmname, create=False)
return np.ndarray(
shape or self.get_shape(shmname),
dtype=dtype or self.get_dtype(shmname),
buffer=existing_shm.buf,
)
def get_shared_dataframe(self, shm_names: SharedDFArrNames) -> pd.DataFrame:
columns_arr = self.get_shared_ndarray(shm_names.columns)
index_arr = self.get_shared_ndarray(shm_names.index)
values_arrs = [self.get_shared_ndarray(shmname) for shmname in shm_names.values]
dtypes_arr = self.get_shared_ndarray(shm_names.dtypes)
if len(set(dtypes_arr)) == 1:
# The shared dataframe's values are editable in-place.
# Update the underlying shared np.ndarray through `df.iloc[.., ..] = ..`
df = pd.DataFrame(
values_arrs[0],
columns=columns_arr,
index=index_arr,
copy=False,
dtype=dtypes_arr[0],
)
else:
values = dict(zip(columns_arr, values_arrs))
df = pd.DataFrame(values, columns=columns_arr, index=index_arr)
df = df.astype(dict(zip(columns_arr, dtypes_arr)), copy=False)
df.columns.names = self.get_shared_array_names(shm_names.columns)
df.index.names = self.get_shared_array_names(shm_names.index)
return df
def share_ndarray(self, arr: np.ndarray, names: NamesT = None) -> str:
new_shm = self.SharedMemory(create=True, size=arr.nbytes)
filler = np.ndarray(arr.shape, dtype=arr.dtype, buffer=new_shm.buf)
filler[:] = arr[:]
self.set_shape(new_shm.name, arr.shape)
self.set_dtype(new_shm.name, arr.dtype)
self.set_names(new_shm.name, names)
return new_shm.name
def share_dataframe(self, df: pd.DataFrame) -> SharedDFArrNames:
columns_arr = df.columns.to_numpy()
index_arr = df.index.to_numpy()
dtypes_arr = df.dtypes.to_numpy()
columns_name = self.share_ndarray(columns_arr, names=df.columns.names)
index_name = self.share_ndarray(index_arr, names=df.index.names)
dtypes_name = self.share_ndarray(dtypes_arr)
if len(set(dtypes_arr)) == 1:
# The shared dataframe's values will be editable in-place.
# Update the underlying shared np.ndarray through `df.iloc[.., ..] = ..`
values_names = [self.share_ndarray(df.values)]
else:
values_names = [
self.share_ndarray(df.iloc[:, j].to_numpy()) for j in range(df.shape[1])
]
return SharedDFArrNames(
columns=columns_name,
index=index_name,
values=values_names,
dtypes=dtypes_name,
)
class Tests:
@staticmethod
def p_handle_shared_array(name: str) -> None:
with SharedMemoryManager(is_child=True) as smm:
names = smm.get_shared_array_names(name)
arr = smm.get_shared_ndarray(name)
print(names)
print(arr)
assert names is None
np.testing.assert_array_equal(arr, Tests.test__share_array.arr)
@staticmethod
def test__share_array() -> None:
arr = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.int32)
with SharedMemoryManager() as smm:
Tests.test__share_array.arr = arr
p = mp.Process(
target=Tests.p_handle_shared_array,
kwargs=dict(name=smm.share_ndarray(arr)),
)
p.start()
p.join()
p.close()
@staticmethod
def p_handle_shared_dataframe(names: SharedDFArrNames) -> None:
with SharedMemoryManager(is_child=True) as smm:
df: pd.DataFrame = smm.get_shared_dataframe(names)
print(df.head())
pd.testing.assert_frame_equal(df, Tests._test__share_dataframe.df)
@staticmethod
def test__share_dataframe() -> None:
import seaborn as sns
Tests._test__share_dataframe(sns.load_dataset("titanic"))
Tests._test__share_dataframe(sns.load_dataset("titanic").set_index("sex"))
Tests._test__share_dataframe(sns.load_dataset("iris"))
Tests._test__share_dataframe(sns.load_dataset("iris").set_index("species"))
@staticmethod
def test__sub1_dataframe() -> None:
import seaborn as sns
Tests._test__sub1_dataframe(sns.load_dataset("titanic").select_dtypes(float))
Tests._test__sub1_dataframe(sns.load_dataset("iris").select_dtypes(float))
@staticmethod
def _test__share_dataframe(df: pd.DataFrame) -> None:
with SharedMemoryManager() as smm:
Tests._test__share_dataframe.df = df
p = mp.Process(
target=Tests.p_handle_shared_dataframe,
kwargs=dict(names=smm.share_dataframe(df)),
)
p.start()
p.join()
p.close()
@staticmethod
def p_sub1_shared_dataframe(names: SharedDFArrNames) -> None:
with SharedMemoryManager(is_child=True) as smm:
df: pd.DataFrame = smm.get_shared_dataframe(names)
print(df.head())
print(df.values.dtype)
df.iloc[:, :] = df.iloc[:, :] - 1
pd.testing.assert_frame_equal(
df, Tests._test__sub1_dataframe.df - 1, check_less_precise=True
)
@staticmethod
def _test__sub1_dataframe(df: pd.DataFrame) -> None:
assert len(set(df.dtypes.to_numpy())) == 1
with SharedMemoryManager() as smm:
Tests._test__sub1_dataframe.df = df
df_names = smm.share_dataframe(df)
p = mp.Process(
target=Tests.p_sub1_shared_dataframe, kwargs=dict(names=df_names)
)
p.start()
p.join()
p.close()
pd.testing.assert_frame_equal(
smm.get_shared_dataframe(df_names), df - 1, check_less_precise=True
)
@staticmethod
def test_store_result_chunked():
# Must have my `processit` module to run this test.
# Download from: https://gist.github.com/austospumanto/6205276f84cd4dde38f3ce17dddccdb3
from .processit import processit
njobs = 6
size = int(1e8)
ncols = 10000
shape = (size // ncols, ncols)
df_input = pd.DataFrame(np.arange(0, size, dtype=np.float64).reshape(shape))
##########
# NORMAL #
##########
expected_time = time.time()
expected = df_input.skew(axis=1).to_numpy()
expected_time = time.time() - expected_time
############
# PARALLEL #
############
parallel_time = time.time()
parallel_actual = np.array(
processit(
[
dict(target=Tests.p_skew_of_ith_row, args=(i, df_input))
for i in df_input.index
],
max_nprocs=njobs,
),
dtype=np.float64,
)
parallel_time = time.time() - parallel_time
###################
# PARALLEL SHARED #
###################
parallel_shared_time = time.time()
arr_output = np.empty(df_input.shape[0], dtype=np.float64)
with SharedMemoryManager() as smm:
df_input_names = smm.share_dataframe(df_input)
arr_output_name = smm.share_ndarray(arr_output)
Tests.p_skew_of_ith_row_shared.df_input_names = df_input_names
Tests.p_skew_of_ith_row_shared.arr_output_name = arr_output_name
processit(
[
dict(target=Tests.p_skew_of_ith_row_shared, args=(i,))
for i in df_input.index
],
max_nprocs=njobs,
)
parallel_shared_actual = smm.get_shared_ndarray(arr_output_name).copy()
parallel_shared_time = time.time() - parallel_shared_time
###########################
# PARALLEL SHARED CHUNKED #
###########################
parallel_shared_chunked_time = time.time()
with SharedMemoryManager() as smm:
df_input_names = smm.share_dataframe(df_input)
Tests.p_skew_chunked.df_input_names = df_input_names
map_args = np.array_split(
ary=df_input.index.to_numpy(),
indices_or_sections=(len(df_input) // njobs) + 1,
)
arr_names = mp.Pool(njobs).map(Tests.p_skew_chunked, map_args, chunksize=1)
parallel_shared_chunked_actual = np.empty(
shape=(df_input.shape[0],), dtype=np.float64
)
np.concatenate(
[
smm.get_shared_ndarray(
name, shape=(map_args[i].size,), dtype=np.float64
)
for i, name in enumerate(arr_names)
],
out=parallel_shared_chunked_actual,
)
parallel_shared_chunked_time = time.time() - parallel_shared_chunked_time
times = dict(
expected_time=expected_time,
parallel_time=parallel_time,
parallel_shared_time=parallel_shared_time,
parallel_shared_chunked_time=parallel_shared_chunked_time,
)
print(json.dumps(times, indent=4, sort_keys=True))
np.testing.assert_array_almost_equal(expected, parallel_actual)
np.testing.assert_array_almost_equal(expected, parallel_shared_actual)
np.testing.assert_array_almost_equal(expected, parallel_shared_chunked_actual)
assert (
parallel_time
< parallel_shared_chunked_time
< expected_time
< parallel_shared_time
)
@staticmethod
def p_skew_of_ith_row_shared(i: int) -> None:
df_input_names: SharedDFArrNames = Tests.p_skew_of_ith_row_shared.df_input_names
arr_output_name: str = Tests.p_skew_of_ith_row_shared.arr_output_name
with SharedMemoryManager(is_child=True) as smm:
df_in: pd.DataFrame = smm.get_shared_dataframe(shm_names=df_input_names)
arr_out: np.ndarray = smm.get_shared_ndarray(shmname=arr_output_name)
arr_out[i] = df_in.iloc[i, :].skew()
@staticmethod
def p_skew_of_ith_row(i: int, df_input: pd.DataFrame) -> np.float64:
return df_input.iloc[i, :].skew()
@staticmethod
def p_skew_chunked(ilocs: np.ndarray) -> str:
df_input_names: SharedDFArrNames = Tests.p_skew_chunked.df_input_names
with SharedMemoryManager(is_child=True) as smm:
df_in: pd.DataFrame = smm.get_shared_dataframe(shm_names=df_input_names)
arr_out: np.ndarray = df_in.iloc[ilocs, :].skew(axis=1).to_numpy()
return smm.share_ndarray(arr=arr_out)
if __name__ == "__main__":
Tests.test_store_result_chunked()
Tests.test__share_array()
Tests.test__share_dataframe()
Tests.test__sub1_dataframe()
@austospumanto
Copy link
Author

austospumanto commented Jun 23, 2019

py_shared_memory

Make sure to comment out the libraries=["rt"] extensions keyword argument in setup.py to be able to build on Mac

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment