Skip to content

Instantly share code, notes, and snippets.

@squeaky-pl
Created August 30, 2016 15:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save squeaky-pl/3122928a74931f5ba162fba43c57c6ff to your computer and use it in GitHub Desktop.
Save squeaky-pl/3122928a74931f5ba162fba43c57c6ff to your computer and use it in GitHub Desktop.
import asyncio as aio
import signal
import os
from datetime import datetime, timedelta
alerts = {
1: datetime(2015, 12, 12, 11, 57, 4),
2: datetime(2015, 8, 16, 11, 58, 5)
}
def sleep_calculator(id):
while True:
now = datetime.now()
alert = alerts[id]
next_send = now.replace(hour=alert.hour, minute=alert.minute, second=alert.second)
if next_send <= now:
next_send += timedelta(days=1)
yield (next_send - now).total_seconds()
async def send_alert(id):
print('I am sending alert', id)
async def wait_for_alert(id):
print('Starting the wait for', id)
for delay in sleep_calculator(id):
print('Will wait', delay, 'for', id)
try:
# This is cancellable
await aio.sleep(delay)
except aio.CancelledError:
print('Terminating coroutine for', id)
return
# This is not cancellable
await aio.shield(send_alert(id))
def handle_terminate(tasks):
print('Shutting down')
for t in tasks:
t.cancel()
def main():
print('kill -TERM', os.getpid())
loop = aio.get_event_loop()
tasks = [aio.ensure_future(wait_for_alert(id)) for id in alerts]
loop.add_signal_handler(signal.SIGTERM, handle_terminate, tasks)
loop.run_until_complete(aio.gather(*tasks))
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment