Created
June 15, 2023 07:27
-
-
Save napsternxg/e049229b62460ecc1579a7a09cbe4e44 to your computer and use it in GitHub Desktop.
Queued Map with retries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, jsonify, request, render_template | |
from queued_map import example_items | |
app = Flask(__name__) | |
@app.get("/") | |
@app.get("/<int:n>") | |
def home(n: int=10): | |
output = example_items(n) | |
return jsonify(output) | |
if __name__ == "__main__": | |
app.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from random import random | |
import logging | |
# logger = logging.Logger(__name__, level=logging.ERROR) | |
logging.basicConfig(level=logging.DEBUG) | |
# logging. | |
def func(i): | |
r = random() | |
if r < 0.5: | |
raise RuntimeError(f"Random Error: {r=:.3f}") | |
return i**2 | |
def queued_map(func, *iterables, max_retries=3): | |
items_iter = iter(zip(*iterables)) | |
q = deque([]) | |
exhausted = False | |
# Add one item to q | |
def _add_item_to_queue(): | |
try: | |
item = next(items_iter) | |
q.append((item, max_retries)) | |
return False | |
except StopIteration: | |
logging.debug(f"\t> Exhausted items_iter {items_iter=}") | |
return True | |
exhausted = _add_item_to_queue() | |
if exhausted: | |
raise StopIteration | |
# Start processing q | |
logging.debug(f"Starting: {len(q)=}, {q=}") | |
while q: | |
item, retries_left = q.popleft() | |
logging.debug(f"{item=}, {retries_left=}, {len(q)=}, {q=}") | |
try: | |
out = func(*item) | |
yield out | |
except RuntimeError as e: | |
logging.debug(f"Reading item {item}, {retries_left=}. Error: {e}") | |
if retries_left > 0: | |
q.appendleft((item, retries_left-1)) | |
else: | |
logging.debug(f"\t> Exhausted retries={retries_left=}") | |
yield None | |
# Add next item to q | |
if not exhausted: | |
exhausted = _add_item_to_queue() | |
def example_items(n=10): | |
output = [] | |
items = range(n) | |
for item, out in zip(items, queued_map(func, items)): | |
logging.debug(f"Returned {item=}, {out=}") | |
output.append((item, out)) | |
logging.debug(output) | |
logging.debug(sorted(output, key=lambda x: x[0])) | |
return output | |
if __name__ == "__main__": | |
example_items(n=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment