Skip to content

Instantly share code, notes, and snippets.

@nathanielobrown
Last active January 30, 2023 18:36
Show Gist options
  • Save nathanielobrown/77e9d7500e317317a8ee3ffc59ea35dc to your computer and use it in GitHub Desktop.
Save nathanielobrown/77e9d7500e317317a8ee3ffc59ea35dc to your computer and use it in GitHub Desktop.
Testing delivery of activity cancellation exceptions (CacelledError) for the Temporal Python SDK
import concurrent.futures
import time
from datetime import timedelta
import requests
import rich
from temporalio import activity, workflow
from temporalio.client import WorkflowFailureError
from temporalio.exceptions import CancelledError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
def print(*args, **kwargs):
rich.print(*args, **kwargs)
@activity.defn
def slow_activity(arg: str):
print(f"\nStarted activity {arg}")
start_time = time.time()
try:
# Big file that will be slow to download (~4 seconds). I used this instead of
# time.sleep just to make **sure** that HTTP requests would be cancelled
requests.get(
"https://github.com/docker/docker-ce/archive/refs/tags/v19.03.14.zip"
)
except CancelledError as e:
run_time = time.time() - start_time
print(
f"\n[yellow]CancelledError in activity {arg} after {run_time:.2f}s:[/yellow] {e!s}"
)
return
print(f"\n[blue]Completed activity {arg}[/blue]")
return f"result {arg}"
@workflow.defn
class Workflow:
@workflow.run
async def run(self, arg: str):
return await workflow.start_activity(
slow_activity, arg, schedule_to_close_timeout=timedelta(seconds=1.2)
)
async def test_define_activity__handles_timeouts__worker_edition():
task_queue = "test"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
client=env.client,
activity_executor=executor,
task_queue=task_queue,
activities=[slow_activity],
workflows=[Workflow],
workflow_runner=UnsandboxedWorkflowRunner(),
max_concurrent_activities=5,
):
handles = []
for i in range(1, 6):
handle = await env.client.start_workflow(
Workflow.run,
f"workflow{i}",
task_queue=task_queue,
id=f"test{i}",
)
handles.append(handle)
for handle in handles:
try:
result = await handle.result()
except WorkflowFailureError as e:
print(f"\n[red]Workflow {handle.id} failed:[/red] {e.cause!s}")
else:
print(f"\n[green]{result}[/green]")
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment