Skip to content

Instantly share code, notes, and snippets.

@petri
Last active September 27, 2020 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save petri/3850154424eb41fa822719f89eda8b99 to your computer and use it in GitHub Desktop.
Save petri/3850154424eb41fa822719f89eda8b99 to your computer and use it in GitHub Desktop.
a first come, first served queing predicate for asyncio Condition
class my_turn:
"""a first come, first served coordinator predicate for
asyncio.Condition.wait_for(predicate); could also do away
the 'caller' by just hardcoding it to the current task
"""
queue = []
def __init__(self, caller):
type(self).queue.insert(0, caller)
def __call__(self, caller):
if type(self).queue[-1] == caller:
type(self).queue.pop()
return True
else:
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment