Last active
April 30, 2024 17:12
-
-
Save gfranxman/9c477f92029e38da6857814b206a4bf8 to your computer and use it in GitHub Desktop.
Parallel execution of tasks with results in submitted order. Execution model for STT and TTS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
tasks = [] | |
DEBUG=False | |
async def mock_event_generator(): | |
""" | |
Mock event generator for parallel processing. | |
""" | |
for i in range(10): | |
tname = "{}: Event {}".format(i, i) | |
print(f"Generated event: {tname}") | |
yield tname | |
async def mock_event_processor(event): | |
""" | |
Mock event processor for parallel processing. | |
""" | |
print(f"Starting event: {event}") if DEBUG else None | |
await asyncio.sleep(random.randint(1, 5)) | |
print(f"Finished processing event: {event}") if DEBUG else None | |
async def parallel_processing_but_sequential_phases(): | |
""" | |
Parallel processing of events. | |
""" | |
global tasks | |
event_generator = mock_event_generator() | |
# but these two stages handled sequentially. | |
async for event in event_generator: | |
task = asyncio.create_task(mock_event_processor(event), name=event) | |
tasks.append(task) # accumulate tasks in order | |
print(f"Queued {task}") if DEBUG else None | |
while tasks: | |
task = tasks.pop(0) | |
print(f"awaiting {task}") | |
await task | |
print(f"Result = {task}") | |
# lets try to make those stages parallel. | |
async def accumulate_events_into_tasks(event_generator): | |
global tasks | |
async for event in event_generator: | |
task = asyncio.create_task(mock_event_processor(event), name=event) | |
tasks.append(task) # accumulate tasks in order | |
print(f"Queued {task}") if DEBUG else None | |
async def parallel_processing_in_order(): | |
""" | |
Parallel processing of events. | |
""" | |
global tasks | |
while tasks: | |
task = tasks.pop(0) | |
print(f"awaiting {task}") | |
await task | |
print(f"Result = {task}") | |
# Now to run the accumulated_events_into_tasks in parallel with parallel_processing_in_order | |
async def parallel_processing(): | |
event_generator = mock_event_generator() | |
task1 = asyncio.create_task(accumulate_events_into_tasks(event_generator)) | |
task2 = asyncio.create_task(parallel_processing_in_order()) | |
# await asyncio.gather(task1, task2) | |
while True: | |
await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED) | |
if task1.done(): | |
print("resetting event generation") | |
event_generator = mock_event_generator() | |
task1 = asyncio.create_task(accumulate_events_into_tasks(event_generator)) | |
if task2.done(): | |
print("resetting processing") | |
task2 = asyncio.create_task(parallel_processing_in_order()) | |
if __name__ == "__main__": | |
# asyncio.run(parallel_processing_but_sequential_phases()) | |
asyncio.run(parallel_processing_but_sequential_phases()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment