Last active
August 3, 2016 03:51
-
-
Save w495/6d3cd6a715e3098a3a10a0479d9fbb03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python 2.* only. For Python 3.* please use concurrent.futures.ProcessPoolExecutor | |
def map_parallel_pymp(obj_list, filter_list, **kwargs): | |
""" | |
Applies several filters from filter_seq | |
to video frames from obj_seq in parallel manner | |
using pymp (OpenMP for Python). | |
Under construction | |
:param Iterable obj_seq: a sequence of video frames. | |
:param Iterable filter_seq: a sequence of filters. | |
:param dict kwargs: filter options | |
:return: | |
""" | |
# number of filters in the sequence. | |
filter_number = len(filter_list) | |
# number of processes (CPUs). | |
PROCESS_NUMBER = 64 | |
# Initialize shared variable | |
# that contains data from each process. | |
shared_res_dict = pymp.shared.dict( | |
{i: {} for i in range(filter_number)} | |
) | |
with pymp.Parallel(PROCESS_NUMBER, if_=True) as map_proc: | |
# In critical section. | |
for map_index in map_proc.range(PROCESS_NUMBER): | |
# If PROCESS_NUMBER is `greater` than `filter_number` | |
# we can use several processes with the same filter, | |
# but with different chunks of frame sequence. | |
# Use residue sharding schema. | |
# So we split obj_list into several chunks | |
# or partitions. And handle each partition | |
# in dedicated process. | |
# Index of current filter. | |
filter_index = map_index % filter_number | |
# Index of current chunk due to sharding schema. | |
chunk_index = map_index // filter_number | |
# The total number of chunks due to sharding schema. | |
chunk_number = (PROCESS_NUMBER // filter_number) | |
# Size of each partition of obj_list. | |
chunk_size = len(obj_list) // chunk_number | |
chunk_begin = chunk_size * chunk_index | |
chunk_end = chunk_size * (chunk_index + 1) | |
# Gets the local filter for this process. | |
filter = filter_list[filter_index] | |
# Gets the local list of object. | |
obj_chunk = obj_list[chunk_begin:chunk_end] | |
# Apply the local filter to the chunk. | |
local_result_seq = filter.filter_objects( | |
obj_chunk, | |
**kwargs | |
) | |
local_result_list = list(local_result_seq) | |
with map_proc.lock: | |
# Strore local result into shared variable. | |
shared_res_dict[map_index] = local_result_list | |
# Out of critical section. | |
# Remap result of partitioned computation. | |
# Join local result for each filter. | |
final_result_dict = {} | |
for map_index, value in shared_res_dict.items(): | |
filter_index = map_index % filter_number | |
chunk_index = map_index // filter_number | |
final_result_dict.setdefault(filter_index, []).extend(value) | |
return final_result_dict |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment