Skip to content

Instantly share code, notes, and snippets.

@w495
Last active August 3, 2016 03:51
Show Gist options
  • Save w495/6d3cd6a715e3098a3a10a0479d9fbb03 to your computer and use it in GitHub Desktop.
Save w495/6d3cd6a715e3098a3a10a0479d9fbb03 to your computer and use it in GitHub Desktop.
# Python 2.* only. For Python 3.* please use concurrent.futures.ProcessPoolExecutor
def map_parallel_pymp(obj_list, filter_list, **kwargs):
"""
Applies several filters from filter_seq
to video frames from obj_seq in parallel manner
using pymp (OpenMP for Python).
Under construction
:param Iterable obj_seq: a sequence of video frames.
:param Iterable filter_seq: a sequence of filters.
:param dict kwargs: filter options
:return:
"""
# number of filters in the sequence.
filter_number = len(filter_list)
# number of processes (CPUs).
PROCESS_NUMBER = 64
# Initialize shared variable
# that contains data from each process.
shared_res_dict = pymp.shared.dict(
{i: {} for i in range(filter_number)}
)
with pymp.Parallel(PROCESS_NUMBER, if_=True) as map_proc:
# In critical section.
for map_index in map_proc.range(PROCESS_NUMBER):
# If PROCESS_NUMBER is `greater` than `filter_number`
# we can use several processes with the same filter,
# but with different chunks of frame sequence.
# Use residue sharding schema.
# So we split obj_list into several chunks
# or partitions. And handle each partition
# in dedicated process.
# Index of current filter.
filter_index = map_index % filter_number
# Index of current chunk due to sharding schema.
chunk_index = map_index // filter_number
# The total number of chunks due to sharding schema.
chunk_number = (PROCESS_NUMBER // filter_number)
# Size of each partition of obj_list.
chunk_size = len(obj_list) // chunk_number
chunk_begin = chunk_size * chunk_index
chunk_end = chunk_size * (chunk_index + 1)
# Gets the local filter for this process.
filter = filter_list[filter_index]
# Gets the local list of object.
obj_chunk = obj_list[chunk_begin:chunk_end]
# Apply the local filter to the chunk.
local_result_seq = filter.filter_objects(
obj_chunk,
**kwargs
)
local_result_list = list(local_result_seq)
with map_proc.lock:
# Strore local result into shared variable.
shared_res_dict[map_index] = local_result_list
# Out of critical section.
# Remap result of partitioned computation.
# Join local result for each filter.
final_result_dict = {}
for map_index, value in shared_res_dict.items():
filter_index = map_index % filter_number
chunk_index = map_index // filter_number
final_result_dict.setdefault(filter_index, []).extend(value)
return final_result_dict
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment