Skip to content

Instantly share code, notes, and snippets.

@mrknmc
Created September 8, 2023 11:12
Show Gist options
  • Save mrknmc/2345dc17aac64830cb34a13fa99ee688 to your computer and use it in GitHub Desktop.
Save mrknmc/2345dc17aac64830cb34a13fa99ee688 to your computer and use it in GitHub Desktop.
workflow.py
from dataclasses import dataclass
from datetime import timedelta
from uuid import UUID
from temporalio import activity
from temporalio import workflow
from parrot.core.logging import get_logger
logger = workflow.LoggerAdapter(get_logger(__name__), None)
@dataclass
class UploadDocumentActivityParams:
document_type: str
booking_id: UUID
@dataclass
class ClaimedSubtaskWorkflowParams:
subtask_id: str
@dataclass
class SubtasksWorkflowParams:
surge: bool
@dataclass
class CreateAgreementActivityParams:
document_type: str
document_id: str
booking_id: UUID
@dataclass
class ManualTranscriptionWorkflowParams:
document_type: str
booking_id: UUID
def is_subtask_claimed(subtask_id: str) -> bool:
return False
def is_subtask_in_final_state(subtask_id: str) -> bool:
return False
@activity.defn
def create_subtask_media_resources():
# TODO: produce & store media resources in S3
pass
@activity.defn
def notify_surge():
# TODO: send email to reviewers
pass
@activity.defn
def create_subtasks():
# TODO: create subtasks and save them in DB
pass
@workflow.defn
class ClaimedSubtaskWorkflow:
@workflow.run
async def run(self, params: ClaimedSubtaskWorkflowParams):
while 1:
await workflow.wait_condition(lambda: is_subtask_claimed(params.subtask_id))
result = await workflow.wait_condition(
lambda: is_subtask_in_final_state(params.subtask_id),
timeout=timedelta(days=1),
)
# TODO: either unclaim if timeout or go back to waiting for it to be claimed
if result.completed:
break
@workflow.defn
class SubtasksWorkflow:
@workflow.run
async def run(self, params: SubtasksWorkflowParams):
media_resources = await workflow.execute_activity(
create_subtask_media_resources
)
subtasks = await workflow.execute_activity(create_subtasks, media_resources)
if params.surge:
subtasks = await workflow.execute_activity(notify_surge, media_resources)
subtask_flows = [
workflow.execute_child_workflow(ClaimedSubtaskWorkflow, subtask)
for subtask in subtasks
]
# wait for all subtasks to be done
await workflow.wait_condition(lambda: all(subtask_flows))
@workflow.defn
class ReviewWorkflow:
@workflow.run
async def run(self):
# TODO: same idea as above
pass
@workflow.defn
class ManualTranscriptionWorkflo:
@workflow.run
async def run(self, params: ManualTranscriptionWorkflowParams):
# TODO : fix wrong enum serialization
logger.info(
f"Document signature workflow for {params.document_type}-{params.booking_id} started"
)
await workflow.start_child_workflow(SubtasksWorkflow, None)
await workflow.start_child_workflow(ReviewWorkflow, None)
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment