Skip to content

Instantly share code, notes, and snippets.

@napsternxg
Created June 15, 2023 07:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save napsternxg/e049229b62460ecc1579a7a09cbe4e44 to your computer and use it in GitHub Desktop.
Save napsternxg/e049229b62460ecc1579a7a09cbe4e44 to your computer and use it in GitHub Desktop.
Queued Map with retries
from flask import Flask, jsonify, request, render_template
from queued_map import example_items
app = Flask(__name__)
@app.get("/")
@app.get("/<int:n>")
def home(n: int=10):
output = example_items(n)
return jsonify(output)
if __name__ == "__main__":
app.run()
from collections import deque
from random import random
import logging
# logger = logging.Logger(__name__, level=logging.ERROR)
logging.basicConfig(level=logging.DEBUG)
# logging.
def func(i):
r = random()
if r < 0.5:
raise RuntimeError(f"Random Error: {r=:.3f}")
return i**2
def queued_map(func, *iterables, max_retries=3):
items_iter = iter(zip(*iterables))
q = deque([])
exhausted = False
# Add one item to q
def _add_item_to_queue():
try:
item = next(items_iter)
q.append((item, max_retries))
return False
except StopIteration:
logging.debug(f"\t> Exhausted items_iter {items_iter=}")
return True
exhausted = _add_item_to_queue()
if exhausted:
raise StopIteration
# Start processing q
logging.debug(f"Starting: {len(q)=}, {q=}")
while q:
item, retries_left = q.popleft()
logging.debug(f"{item=}, {retries_left=}, {len(q)=}, {q=}")
try:
out = func(*item)
yield out
except RuntimeError as e:
logging.debug(f"Reading item {item}, {retries_left=}. Error: {e}")
if retries_left > 0:
q.appendleft((item, retries_left-1))
else:
logging.debug(f"\t> Exhausted retries={retries_left=}")
yield None
# Add next item to q
if not exhausted:
exhausted = _add_item_to_queue()
def example_items(n=10):
output = []
items = range(n)
for item, out in zip(items, queued_map(func, items)):
logging.debug(f"Returned {item=}, {out=}")
output.append((item, out))
logging.debug(output)
logging.debug(sorted(output, key=lambda x: x[0]))
return output
if __name__ == "__main__":
example_items(n=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment