Skip to content

Instantly share code, notes, and snippets.

@gfranxman
Last active April 30, 2024 17:12
Show Gist options
  • Save gfranxman/9c477f92029e38da6857814b206a4bf8 to your computer and use it in GitHub Desktop.
Save gfranxman/9c477f92029e38da6857814b206a4bf8 to your computer and use it in GitHub Desktop.
Parallel execution of tasks with results in submitted order. Execution model for STT and TTS
import asyncio
import random
tasks = []
DEBUG=False
async def mock_event_generator():
"""
Mock event generator for parallel processing.
"""
for i in range(10):
tname = "{}: Event {}".format(i, i)
print(f"Generated event: {tname}")
yield tname
async def mock_event_processor(event):
"""
Mock event processor for parallel processing.
"""
print(f"Starting event: {event}") if DEBUG else None
await asyncio.sleep(random.randint(1, 5))
print(f"Finished processing event: {event}") if DEBUG else None
async def parallel_processing_but_sequential_phases():
"""
Parallel processing of events.
"""
global tasks
event_generator = mock_event_generator()
# but these two stages handled sequentially.
async for event in event_generator:
task = asyncio.create_task(mock_event_processor(event), name=event)
tasks.append(task) # accumulate tasks in order
print(f"Queued {task}") if DEBUG else None
while tasks:
task = tasks.pop(0)
print(f"awaiting {task}")
await task
print(f"Result = {task}")
# lets try to make those stages parallel.
async def accumulate_events_into_tasks(event_generator):
global tasks
async for event in event_generator:
task = asyncio.create_task(mock_event_processor(event), name=event)
tasks.append(task) # accumulate tasks in order
print(f"Queued {task}") if DEBUG else None
async def parallel_processing_in_order():
"""
Parallel processing of events.
"""
global tasks
while tasks:
task = tasks.pop(0)
print(f"awaiting {task}")
await task
print(f"Result = {task}")
# Now to run the accumulated_events_into_tasks in parallel with parallel_processing_in_order
async def parallel_processing():
event_generator = mock_event_generator()
task1 = asyncio.create_task(accumulate_events_into_tasks(event_generator))
task2 = asyncio.create_task(parallel_processing_in_order())
# await asyncio.gather(task1, task2)
while True:
await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
if task1.done():
print("resetting event generation")
event_generator = mock_event_generator()
task1 = asyncio.create_task(accumulate_events_into_tasks(event_generator))
if task2.done():
print("resetting processing")
task2 = asyncio.create_task(parallel_processing_in_order())
if __name__ == "__main__":
# asyncio.run(parallel_processing_but_sequential_phases())
asyncio.run(parallel_processing_but_sequential_phases())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment