Last active
April 21, 2017 00:30
-
-
Save w495/82f7b21509a69a0d70e18f2e4ddf5ed9 to your computer and use it in GitHub Desktop.
Example for concurrent.futures.ProcessPoolExecutor. One of the key ideas to use `as_completed` to get un-ordered result. If a sequence order is import, you should sort result by yourself. This is part of https://github.com/w495/python-video-shot-detector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import multiprocessing as mp | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from .base_extractor import BaseExtractor | |
class ParallelExtractor(BaseExtractor): | |
POOL_SIZE = mp.cpu_count() | |
IMAGE_GROUP_SIZE = 512 | |
def transform_frame_images(self, image_seq, **kwargs): | |
future_seq = self.image_group_future_seq(image_seq, **kwargs) | |
index_group_seq = self.future_result_seq(future_seq) | |
for _, group in sorted(index_group_seq): | |
for image in group: | |
yield image | |
def future_result_seq(self, future_seq): | |
future_list = list(future_seq) | |
future_seq = as_completed(future_list) | |
for future in future_seq: | |
yield future.result() | |
def image_group_future_seq(self, image_seq, **kwargs): | |
image_group_seq = self.image_group_seq(image_seq) | |
with ProcessPoolExecutor(self.POOL_SIZE) as executor: | |
for index, image_group in enumerate(image_group_seq): | |
# Serialization for submit to ProcessPoolExecutor. | |
image_list = list(image_group) | |
future = executor.submit( | |
self.local_transform_frame_images, | |
index, | |
image_list, | |
**kwargs | |
) | |
yield future | |
def local_transform_frame_images(self, index, image_list, **kwargs): | |
# Deserialization. | |
image_seq = iter(image_list) | |
image_seq = super(ParallelExtractor, self).transform_frame_images(image_seq, **kwargs) | |
image_list = list(image_seq) | |
return index, image_list | |
def image_group_seq(self, image_seq): | |
size = self.IMAGE_GROUP_SIZE | |
it = iter(image_seq) | |
group = list(itertools.islice(it, size)) | |
while group: | |
yield group | |
group = list(itertools.islice(it, size)) | |
# size = random.randint(32, 512) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment