Skip to content

Instantly share code, notes, and snippets.

@maharjun
Created May 4, 2023 05:11
Show Gist options
  • Save maharjun/cae3431174dcda176776120e7b0f7cf5 to your computer and use it in GitHub Desktop.
Save maharjun/cae3431174dcda176776120e7b0f7cf5 to your computer and use it in GitHub Desktop.
SCOOP Utilities
"""A bunch of utilities that prove useful with the SCOOP Parallelization framework"""
###############################################################################
# BSD 3-Clause License
#
# Copyright (c) 2023, maharjun
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
###############################################################################
import socket
import os
import time
from typing import Callable
import numpy as np
from uuid import uuid4
from scoop import futures
import scoop
# This file can be taken from the Numpy utils gist in https://gist.github.com/maharjun
from utils.numpyutils import almost_equal_split
# Set this in-case scoop is losing workers due to slow network setup times
# scoop.TIME_BEFORE_LOSING_WORKER = <higher-value>
def scoop_blank_function():
pass
class ScoopPaddedUniqueFunction:
"""
A class that ensures the execution of a unique function on a
SCOOP parallelized environment with a padding time to distribute the jobs.
Attributes
----------
function : callable
The function to be executed.
unique_name : str
The unique identifier for this instance of the class.
pad_atleast : float
The padding time in seconds before the next job can be executed.
"""
__jobs_run__ = set()
def __init__(self, function, unique_name, pad_atleast=5.):
"""
Parameters
----------
function : callable
The function to be executed.
unique_name : str
The unique identifier for this instance of the class.
pad_atleast : float, optional
The padding time in seconds before the next job can be executed, by
default 5.0. The padding time helps to ensure that the jobs are
properly distributed across the different workers. Needs to be
reasonable enough so that the jobs can be sent everywhere before the
first job finishes. depends on parallelism setup which is why this
parameter is adjustable.
"""
self.pad_atleast = pad_atleast
self.function = function
self.unique_name = unique_name
def __call__(self, dummy):
"""
Executes the function and ensures the padding time before
the next job can be executed. This method also guards against
re-execution of the same function on the same worker.
Parameters
----------
dummy : Any
A dummy argument that is not used in the function but is required
because the SCOOP framework expects the mapped functions to have
at least one argument.
Returns
-------
tuple
A tuple containing the worker's hostname and process ID.
"""
if self.unique_name in ScoopPaddedUniqueFunction.__jobs_run__:
time.sleep(self.pad_atleast)
else:
init_time = time.time()
self.function()
time_elapsed = time.time() - init_time
if time_elapsed < self.pad_atleast:
time.sleep(self.pad_atleast - time_elapsed)
ScoopPaddedUniqueFunction.__jobs_run__.union({self.unique_name})
return (socket.gethostname(), os.getpid())
def scoop_exactly_once_on_all_workers(function, init_estimate_n_workers=10, holdtime=5.):
"""
Ensures that the given function is executed exactly once on all workers
in a SCOOP parallelized environment.
Parameters
----------
function : callable
The function to be executed on all workers.
init_estimate_n_workers : int, optional
The initial estimated number of workers, by default 10.
holdtime : float, optional
The padding time in seconds before the next job can be executed,
by default 5.0.
Returns
-------
set
A set of tuples containing the workers' hostnames and process IDs.
"""
if not scoop.IS_RUNNING:
return set((socket.gethostname(), os.getpid()))
assert scoop.IS_ORIGIN, "The get_scoop_n_workers can only be run from the origin"
wrapped_func = ScoopPaddedUniqueFunction(function, str(uuid4()), pad_atleast=holdtime)
is_verfied_count = False
previous_estimate = 1
while not is_verfied_count:
curr_n_workers = max(previous_estimate, init_estimate_n_workers)
map_results = []
map_results_set = set()
while len(map_results) == len(map_results_set):
curr_n_workers += (curr_n_workers + 1) // 2
map_results = list(futures.map(wrapped_func, range(curr_n_workers)))
map_results_set = set(map_results)
if len(map_results_set) == previous_estimate:
return map_results_set
else:
assert len(map_results_set) > previous_estimate, "Nodes lost during estimate!!! this shouldn't happen"
previous_estimate = len(map_results_set)
def get_n_scoop_workers(init_estimate=10, holdtime=5.):
"""
Parameters
----------
runs a dummy task on all workers to try to find
init_estimate: int
initial estimate of the number of available workers. it is best
if it's just bigger than the number of available workers.
holdtime: float
The holdtime of the dummy task used to probe the workers in
seconds. needs to be reasonable enough so that the jobs can be
sent everywhere before the first job finishes. depends on
parallelism setup which is why this parameter is adjustable.
"""
if not scoop.IS_RUNNING:
return 1
assert scoop.IS_ORIGIN, "The get_scoop_n_workers can only be run from the origin"
n_scoop_workers = len(scoop_exactly_once_on_all_workers(scoop_blank_function, init_estimate, holdtime))
return n_scoop_workers
class BatchParallelizeScoop:
"""
Takes a function `function` that takes a batched input (with all but the last
dimension being the batch dimensions). When calling `__call__`, this class
splits the input_array into `n_scoop_workers` sub-arrays and parallelizes the
computation of `function` over them
Parameters
----------
n_scoop_workers : int
The number of scoop workers (see ~get_n_scoop_workers to evaluate the number of
available scoop workers)
function : Callable
A function that takes as the first argument a numpy ndarray where all but the
last dimension of the array represent batch dimensions
*args, **kwargs
Additional positional and keyword arguments to be evaluated by the function
Methods
-------
__call__(input_array):
THe logic is as follows. flatten all but the last dimension of input_array,
split this array into `n_scoop_workers` chunks, with each chunk being almost
equal in size. Run the function `function` on each of these chunks and
reassemble the output to the original batch shape
"""
def __init__(self, n_scoop_workers: int, function: Callable, *args, **kwargs):
from functools import partial
self.func_partial = partial(function, *args, **kwargs)
self.n_scoop_workers = n_scoop_workers
@staticmethod
def _concatenate_result(result_list):
if isinstance(result_list[0], np.ndarray):
return np.concatenate(result_list, axis=0)
elif result_list[0] is None:
return None
elif isinstance(result_list[0], tuple) and len(result_list[0]) > 0:
result_tuple = ()
for i in range(len(result_list[0])):
result_tuple += (BatchParallelizeScoop._concatenate_result([x[i] for x in result_list]),)
return result_tuple
else:
raise TypeError("The function wrapped by BatchParallelizeScoop"
" should return either ndarrays, None"
" or nested tuples of the above two types")
@staticmethod
def _reshape_results(results, shape):
if isinstance(results, np.ndarray):
return np.reshape(results, shape)
elif results is None:
return results
elif isinstance(results, tuple) and len(results) > 0:
result_tuple = ()
for res in results:
result_tuple += (BatchParallelizeScoop._reshape_results(res, shape),)
return result_tuple
else:
raise TypeError("Invalid type to reshape (required either np.ndarray, or a nested tuple of the same)")
def __call__(self, input_array: np.ndarray):
"""
THe logic is as follows. flatten all but the last dimension of input_array,
split this array into `n_scoop_workers` chunks, with each chunk being almost
equal in size. Run the function `function` on each of these chunks and
reassemble the output to the original batch shape
Parameters
----------
input_array : np.ndarray
The input array that is to be evaluated by the function `function`
"""
assert input_array.size > 0, \
"The input array to calculate the function on using BatchParallelizeScoop cannot be empty"
old_shape = input_array.shape
input_array = np.reshape(input_array, (-1, old_shape[-1]))
if self.n_scoop_workers == 1:
ret_score_array = self.func_partial(input_array)
else:
input_array_split = almost_equal_split(input_array, self.n_scoop_workers)
input_array_split = [x for x in input_array_split if x.size > 0]
result_list = list(futures.map(self.func_partial, input_array_split))
ret_score_array = BatchParallelizeScoop._concatenate_result(result_list)
return BatchParallelizeScoop._reshape_results(ret_score_array, old_shape[:-1])
class ScoopSharedContext:
__shared_cache__ = {}
def __init__(self, temp_directory: str = os.environ['CLUSTER_TEMP']):
self.temp_directory = temp_directory
self.filename_map = {}
def set_const(self, name, value):
import dill
from tempfile import NamedTemporaryFile
with NamedTemporaryFile(mode="wb", dir=self.temp_directory, delete=False) as fout:
dill.dump(value, fout, protocol=-1)
self.filename_map[name] = fout.name
def get_const(self, name, cache: bool = False):
if name not in self.filename_map:
raise KeyError(f"Found no shared variable named '{name}' in this shared_context")
import dill
cache_key = (self.temp_directory, self.filename_map[name])
get_value_from_file = (not cache) or (cache_key not in ScoopSharedContext.__shared_cache__)
if get_value_from_file:
with open(self.filename_map[name], 'rb') as fin:
value = dill.load(fin)
if cache:
ScoopSharedContext.__shared_cache__[cache_key] = value
else:
value = ScoopSharedContext.__shared_cache__[cache_key]
return value
@classmethod
def clear_cache(cls, name):
for key in cls.__shared_cache__:
del cls.__shared_cache__[key]
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
for v in self.filename_map.values():
os.remove(os.path.join(self.temp_directory, v))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment