Created
February 3, 2024 15:40
-
-
Save Holi0317/770847fb0c3b43cfd9864739df5c1a23 to your computer and use it in GitHub Desktop.
Temporal workflow with asyncio.Semaphore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from datetime import timedelta | |
from typing import List | |
from temporalio import activity, workflow | |
from temporalio.client import Client | |
from temporalio.worker import Worker | |
@activity.defn | |
async def say_hello_activity(name: str) -> str: | |
await asyncio.sleep(1) | |
return f"Hello, {name}!" | |
@workflow.defn | |
class SayHelloWorkflow: | |
semaphore: asyncio.Semaphore | |
def __init___(self) -> None: | |
# Initialize the semaphore here. But it will get replaced when workflow | |
# execute. | |
# Purpose of this init is to proof pyright/mypy we always got an | |
# initialized value. | |
self.semaphore = asyncio.Semaphore(1) | |
@workflow.run | |
async def run(self) -> List[str]: | |
# Hard-coding to 50 here. But you can use workflow input to initialize | |
# the semaphore | |
self.semaphore = asyncio.Semaphore(10) | |
tasks = [self._task() for _ in range(50)] | |
results = await asyncio.gather(*tasks) | |
return list(sorted(results)) | |
async def _task(self): | |
async with self.semaphore: | |
return await workflow.execute_activity( | |
say_hello_activity, "user1", start_to_close_timeout=timedelta(minutes=1) | |
) | |
async def main(): | |
# Start client | |
client = await Client.connect("localhost:7233") | |
# Run a worker for the workflow | |
async with Worker( | |
client, | |
task_queue="hello-parallel-activity-task-queue", | |
workflows=[SayHelloWorkflow], | |
activities=[say_hello_activity], | |
): | |
# While the worker is running, use the client to run the workflow and | |
# print out its result. Note, in many production setups, the client | |
# would be in a completely separate process from the worker. | |
result = await client.execute_workflow( | |
SayHelloWorkflow.run, | |
id="hello-parallel-activity-workflow-id", | |
task_queue="hello-parallel-activity-task-queue", | |
) | |
print(f"Result: {result}") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment