Skip to content

Instantly share code, notes, and snippets.

@Holi0317
Created February 3, 2024 15:40
Show Gist options
  • Save Holi0317/770847fb0c3b43cfd9864739df5c1a23 to your computer and use it in GitHub Desktop.
Save Holi0317/770847fb0c3b43cfd9864739df5c1a23 to your computer and use it in GitHub Desktop.
Temporal workflow with asyncio.Semaphore
import asyncio
from datetime import timedelta
from typing import List
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
@activity.defn
async def say_hello_activity(name: str) -> str:
await asyncio.sleep(1)
return f"Hello, {name}!"
@workflow.defn
class SayHelloWorkflow:
semaphore: asyncio.Semaphore
def __init___(self) -> None:
# Initialize the semaphore here. But it will get replaced when workflow
# execute.
# Purpose of this init is to proof pyright/mypy we always got an
# initialized value.
self.semaphore = asyncio.Semaphore(1)
@workflow.run
async def run(self) -> List[str]:
# Hard-coding to 50 here. But you can use workflow input to initialize
# the semaphore
self.semaphore = asyncio.Semaphore(10)
tasks = [self._task() for _ in range(50)]
results = await asyncio.gather(*tasks)
return list(sorted(results))
async def _task(self):
async with self.semaphore:
return await workflow.execute_activity(
say_hello_activity, "user1", start_to_close_timeout=timedelta(minutes=1)
)
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-parallel-activity-task-queue",
workflows=[SayHelloWorkflow],
activities=[say_hello_activity],
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
SayHelloWorkflow.run,
id="hello-parallel-activity-workflow-id",
task_queue="hello-parallel-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment