Created
March 25, 2015 12:37
-
-
Save jfwood/a8130265b0db3c793ec8 to your computer and use it in GitHub Desktop.
Barbican Task Workflow Manager Proposal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TaskWorkflow(object): | |
"""Task workflow manager. | |
Proposal: | |
Manages the processing flow for a :class:`BaseTask` implementation. | |
This would replace the @transactional decorator, and provide full | |
rollback support to the task's handle_processing() call, and still | |
allow for marking the order as being in the ERROR state properly. | |
Note that since multiple workers are active at once, and because | |
clients could be modifying orders at the same time an order is | |
being retried, we need to take care to check for concurrent mods | |
to the order in the process_error() method below. Also, the task's | |
retrieve_entity() method is assumed to be performing a select-for- | |
update on the order record so that no other worker process can | |
process that order simultaneously (they would be blocked). | |
""" | |
def __init__(self, task): | |
super(TaskWorkflow, self).__init__() | |
self.task = task | |
self.is_db_action_needed = queue.is_server_side() | |
def execute(self, *args, **kwargs): | |
entity = None | |
try: | |
entity = self.task.retrieve_entity(*args, **kwargs) | |
result = self.task.handle_processing(entity, *args, **kwargs) | |
self.task.handle_success(entity, result, *args, **kwargs) | |
self._commit() | |
except Exception as e: | |
self._rollback() | |
self.process_error(entity, e, *args, **kwargs) | |
finally: | |
self._end_transaction() | |
def process_error( | |
self, entity_original, process_exception, *args, **kwargs): | |
try: | |
entity_refreshed = self.task.retrieve_entity(*args, **kwargs) | |
if self._entity_changed(entity_original, entity_refreshed): | |
# If its possible (not probable) that another worker process | |
# modified the original entity in the fraction of the time | |
# from the rollback() call above until this method call. | |
# If we detect a modification, warn and leave. | |
return | |
status, message = api.generate_safe_exception_message( | |
self.task.get_name(), process_exception) | |
self.task.handle_error( | |
entity_refreshed, status, message, process_exception, | |
*args, **kwargs) | |
self._commit() | |
except Exception: | |
self._rollback() | |
def _entity_changed(self, original, latest): | |
if original and latest: | |
return original.updated_at != latest.updated_at | |
else: | |
return False | |
def _commit(self): | |
if self.is_db_action_needed: | |
repositories.commit() | |
def _rollback(self): | |
if self.is_db_action_needed: | |
repositories.rollback() | |
def _end_transaction(self): | |
if self.is_db_action_needed: | |
repositories.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment