Skip to content

Instantly share code, notes, and snippets.

@klen
Created December 10, 2014 11:04
Show Gist options
  • Save klen/fce99b083a789000dda5 to your computer and use it in GitHub Desktop.
Save klen/fce99b083a789000dda5 to your computer and use it in GitHub Desktop.
from django.core.exceptions import ValidationError, PermissionDenied
from .models import QualityProcess
from .utils.flow import task_loader
from viewflow import flow, lock
from viewflow.base import Flow, this
class QualityFlow(Flow):
""" Implement a qualification process. """
process_cls = QualityProcess
lock_impl = lock.select_for_update_lock
# Start a quality process
@flow.StartFunction
def start(activation, organization):
""" Start the qualification process. """
if organization.role != 'supplier':
raise ValidationError('Invalid organization')
activation.prepare()
activation.process.organization = organization
activation.done()
start.Next(this.approve)
# Approve a quality process
@flow.Function
@flow.flow_func(task_loader=task_loader)
def approve(activation, process, reviewer=None, level=0, approve=True):
activation.prepare()
if not (reviewer and reviewer.is_personal):
raise PermissionDenied
activation.process.approved = approve
activation.process.reviewed_by = reviewer
activation.process.level = level
activation.process.save()
activation.done()
approve.Next(this.check_approve)
check_approve = flow.If(cond=lambda p: p.approved)\
.OnTrue(this.send_approved)\
.OnFalse(this.send_rejected)
@flow.Function
@flow.flow_func(task_loader=task_loader)
def send_approved(activation, process, *args, **kwargs):
activation.prepare()
activation.done()
send_approved.Next(this.end)
@flow.Function
@flow.flow_func(task_loader=task_loader)
def send_rejected(activation, process, *args, **kwargs):
activation.prepare()
activation.done()
send_rejected.Next(this.end)
end = flow.End()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment