Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nathanielobrown/887972a83567e786ace902346b71fbb6 to your computer and use it in GitHub Desktop.
Save nathanielobrown/887972a83567e786ace902346b71fbb6 to your computer and use it in GitHub Desktop.
import asyncio
import concurrent.futures
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
started_event = asyncio.Event()
@activity.defn
async def slow_activity() -> None:
started_event.set()
while True:
await asyncio.sleep(0.1)
activity.heartbeat()
@workflow.defn
class Workflow:
@workflow.run
async def run(self):
print("Starting activity...")
handle = workflow.start_activity(
slow_activity, schedule_to_close_timeout=timedelta(seconds=30)
)
print("Activity started")
return await handle
async def test_time_skipping_past_activity_timeout():
task_queue = "test"
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
client=env.client,
task_queue=task_queue,
activities=[slow_activity],
workflows=[Workflow],
):
# Start the wait on a workflow. This activates time skipping.
task = asyncio.create_task(env.client.execute_workflow(
Workflow.run,
task_queue=task_queue,
id="test",
))
# Wait for activity to start
await started_event.wait()
# Sleep
print("Before sleep")
# This would hang if we had used `env.client.start_workflow`
await env.sleep(40)
print("After sleep")
await task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment