Skip to content

Instantly share code, notes, and snippets.

@jomido
Created March 15, 2017 19:39
Show Gist options
  • Save jomido/039ccb2b5f983ad98af16909da6b7922 to your computer and use it in GitHub Desktop.
Save jomido/039ccb2b5f983ad98af16909da6b7922 to your computer and use it in GitHub Desktop.
Asyncio Sync Counter
import asyncio
class Counter(object):
def __init__(self, max=500, duration=100, loop=None):
self.max = max
self.duration = duration
self.loop = loop or asyncio.get_event_loop()
self.count = 0
self.reset = asyncio.Future()
loop.call_later(duration, self.tick)
def tick(self):
self.reset.set_result(None)
self.count = 0
self.reset = asyncio.Future()
self.loop.call_later(self.duration, self.tick)
async def increase(self):
if self.count < self.max:
self.count += 1
return
await self.reset
await self.increase()
async def __aenter__(self):
await self.increase()
async def __aexit__(self, exc_type, exc, tb):
# do nothing
pass
import asyncio
from counter import Counter
async def main(loop):
counter = Counter(max=2, duration=3, loop=loop)
while True:
async with counter:
print("I'm working!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
m = main(loop)
loop.run_until_complete(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment