Skip to content

Instantly share code, notes, and snippets.

@digitalronin
Created March 29, 2023 05:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save digitalronin/746c4f870edb58997f84e648e336bf52 to your computer and use it in GitHub Desktop.
Save digitalronin/746c4f870edb58997f84e648e336bf52 to your computer and use it in GitHub Desktop.
import asyncio
import prefect # v2.8.7
from prefect.client import get_client
from prefect import flow, get_run_logger, task, unmapped
"""
Usage:
1. Authenticate to Prefect Cloud
2. Save this script and execute, e.g. `python one-at-a-time.py`
The output is log lines like this:
```
$ python one-at-a-time.py
14:31:12.136 | INFO | prefect.engine - Created flow run 'placid-potoo' for flow 'One at a time test'
14:31:13.879 | INFO | Flow run 'placid-potoo' - Starting flow for list of 10 items
...
14:31:14.835 | INFO | Flow run 'placid-potoo' - Created task run 'mytask-0' for task 'mytask'
14:31:14.836 | INFO | Flow run 'placid-potoo' - Submitted task run 'mytask-0' for execution.
14:31:15.246 | INFO | Task run 'mytask-9' - Starting mytask for item 9
14:31:15.536 | INFO | Task run 'mytask-9' - Finished in state Completed()
14:31:45.566 | INFO | Task run 'mytask-3' - Starting mytask for item 3 # <--- 30 second interval
14:31:45.862 | INFO | Task run 'mytask-3' - Finished in state Completed()
14:32:15.993 | INFO | Task run 'mytask-8' - Starting mytask for item 8 # <--- 30 second interval
14:32:16.339 | INFO | Task run 'mytask-8' - Finished in state Completed()
14:32:46.481 | INFO | Task run 'mytask-7' - Starting mytask for item 7 # <--- 31 second interval
14:32:46.855 | INFO | Task run 'mytask-7' - Finished in state Completed()
...
14:35:49.529 | INFO | Flow run 'placid-potoo' - Finished in state Completed('All states completed.')
```
"""
async def set_concurrency_limit(tag: str, limit: int):
async with get_client() as client:
limit_id = await client.create_concurrency_limit(tag=tag, concurrency_limit=limit)
@flow(name="One at a time test")
def myflow():
number_of_items = 10
items = list(range(0, number_of_items))
logger = get_run_logger()
logger.info(f"Starting flow for list of {len(items)} items")
results = mytask.map(items)
return results
@task(tags=["one-at-a-time"]) # Without the matching tag value, tasks run with no delays
def mytask(item: int) -> int:
logger = get_run_logger()
logger.info(f"Starting mytask for item {item}")
return item
if __name__ == "__main__":
asyncio.run(set_concurrency_limit("one-at-a-time", 1))
myflow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment