Skip to content

Instantly share code, notes, and snippets.

@orklann
Forked from Rustem/mock_image_processing.py
Created May 24, 2021 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orklann/03120f52ec603feec13ed369737bf17a to your computer and use it in GitHub Desktop.
Save orklann/03120f52ec603feec13ed369737bf17a to your computer and use it in GitHub Desktop.
"""
1. Store the image and create an instance of ImagefileProcessHistory with status NOT_STARTED
ImageFileProcessHistory
- local_path
- file_name
- status # enum: NOT_STARTED, STARTED, SUCCEEDED, FAILED
- bytes_processed
- bytes_total
"""
#2. Call a celery task with the id of just created instance which will be responsible for actual image processing. Reply http response with the instance id `image_process_history_id`
@shared_task
process_image(image_process_history_id):
mngr = ImageFileProcessingManager.load(image_process_history_id)
#
current_status = mngr.process()
return current_status, image_process_history_id
# 3. Then you can periodically poll your server via json rest service to get the current progress of the history object. ProgressTracker will upgrade it depending on PROGRESS_GRANULARITY (say each 10000 bytes). See below:
# ... Django rest service
# 4. You need a specified manager ImageFileProcessingManager and processor ImageProcessor itself which will orchestrate the actual processing:
class ImageFileProcessingManager(object):
history_obj = None
trackers = None
@classmethod
def load(cls, history_id):
history_obj = ImageFileProcessHistory.objects().get_obj(history_id)
if history_obj is None:
raise DoesNotExist("...")
return cls(history_id)
def __init__(self, history_obj):
self.history_obj = history_obj
def register_progress_tracker():
self.trackers.append(ProgressTracker(self.history_obj))
def process(self):
# orchestrate your processing here
# on your use case
processor = ImageProcessor()
processor.register_progress_tracker(ProgressTracker(self.history_obj))
try:
self.set_status(statuses.STARTED)
return processor.process(self.history_obj.get_full_path())
except YouRCustomException:
self.set_status(statuses.FAILED)
finally:
self.set_status(statuses.COMPLETED)
return self.history_obj.status
class ImageProcessor(object):
def __init__(self):
self.trackers = []
def register_progress_tracker(self, tracker):
self.trackers.append(tracker)
def process(self, file_path):
# load file
# do something computationally intensive
# update your progress tracker regularly - could be upgraded each 1000 bytes or depending on your scenario
# return prog
# 5. In case you need some clue about ProgressTracker
class ProgressTracker(object):
def __init__(self, history_obj):
"""
Creates the progress tracker
:param history_obj: Import history with the progress that should be updated
"""
self.history_obj = history_obj
self.counter = 0
def tell(self, progress):
"""
Updates the import history progress
:param progress: Progress in bytes
:type progress: int
"""
self.counter += 1
if self.counter % settings.PROGRESS_GRANULARITY == 0:
self.history_obj.bytes_imported = progress
self.history_obj.save()
def finished(self):
self.history_obj.bytes_imported = self.history_obj.bytes_total
self.history_obj.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment