Skip to content

Instantly share code, notes, and snippets.

@jfwood
Created March 25, 2015 12:37
Show Gist options
  • Save jfwood/a8130265b0db3c793ec8 to your computer and use it in GitHub Desktop.
Save jfwood/a8130265b0db3c793ec8 to your computer and use it in GitHub Desktop.
Barbican Task Workflow Manager Proposal
class TaskWorkflow(object):
"""Task workflow manager.
Proposal:
Manages the processing flow for a :class:`BaseTask` implementation.
This would replace the @transactional decorator, and provide full
rollback support to the task's handle_processing() call, and still
allow for marking the order as being in the ERROR state properly.
Note that since multiple workers are active at once, and because
clients could be modifying orders at the same time an order is
being retried, we need to take care to check for concurrent mods
to the order in the process_error() method below. Also, the task's
retrieve_entity() method is assumed to be performing a select-for-
update on the order record so that no other worker process can
process that order simultaneously (they would be blocked).
"""
def __init__(self, task):
super(TaskWorkflow, self).__init__()
self.task = task
self.is_db_action_needed = queue.is_server_side()
def execute(self, *args, **kwargs):
entity = None
try:
entity = self.task.retrieve_entity(*args, **kwargs)
result = self.task.handle_processing(entity, *args, **kwargs)
self.task.handle_success(entity, result, *args, **kwargs)
self._commit()
except Exception as e:
self._rollback()
self.process_error(entity, e, *args, **kwargs)
finally:
self._end_transaction()
def process_error(
self, entity_original, process_exception, *args, **kwargs):
try:
entity_refreshed = self.task.retrieve_entity(*args, **kwargs)
if self._entity_changed(entity_original, entity_refreshed):
# If its possible (not probable) that another worker process
# modified the original entity in the fraction of the time
# from the rollback() call above until this method call.
# If we detect a modification, warn and leave.
return
status, message = api.generate_safe_exception_message(
self.task.get_name(), process_exception)
self.task.handle_error(
entity_refreshed, status, message, process_exception,
*args, **kwargs)
self._commit()
except Exception:
self._rollback()
def _entity_changed(self, original, latest):
if original and latest:
return original.updated_at != latest.updated_at
else:
return False
def _commit(self):
if self.is_db_action_needed:
repositories.commit()
def _rollback(self):
if self.is_db_action_needed:
repositories.rollback()
def _end_transaction(self):
if self.is_db_action_needed:
repositories.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment