Skip to content

Instantly share code, notes, and snippets.

@squeaky-pl
Created August 30, 2016 17:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save squeaky-pl/7d05ffff36ec384c6215c1b5352ef3b8 to your computer and use it in GitHub Desktop.
Save squeaky-pl/7d05ffff36ec384c6215c1b5352ef3b8 to your computer and use it in GitHub Desktop.
test.py
import asyncio as aio
import signal
import os
from functools import partial
from datetime import datetime, timedelta
initial_alerts = {
1: datetime(2015, 12, 12, 12, 13, 4),
2: datetime(2015, 8, 16, 12, 14, 5)
}
def sleep_calculator(created_at):
while True:
now = datetime.now()
next_send = now.replace(hour=created_at.hour, minute=created_at.minute, second=created_at.second)
if next_send <= now:
next_send += timedelta(days=1)
yield (next_send - now).total_seconds()
async def send_alert(id):
print('I am sending alert', id)
async def wait_for_alert(id, created_at):
print('Starting the wait for', id)
for delay in sleep_calculator(created_at):
print('Will wait', delay, 'for', id)
try:
# This is cancellable
await aio.sleep(delay)
except aio.CancelledError:
print('Terminating coroutine for', id)
return
# This is not cancellable
await aio.shield(send_alert(id))
def handle_terminate(terminating):
print('Shutting down')
terminating.set()
def handle_line(tasks, line):
cmd, *params = line.split()
if cmd == 'add':
id = int(params[0])
created_at = datetime(*map(int, params[1:]))
tasks[id] = aio.ensure_future(wait_for_alert(id, created_at))
elif cmd == 'del':
id = int(params[0])
tasks.pop(id).cancel()
else:
print('What did you mean?')
async def handle_client(clients, tasks, reader, writer):
clients.append(aio.Task.current_task())
print('New client connected')
while True:
try:
line = (await reader.readline()).decode('ascii').strip()
# the process is shutting down but we hace clients, disconnect them
except aio.CancelledError:
writer.write('Goodbye\n'.encode('ascii'))
writer.write_eof()
break
if not line:
break
print('got', line)
handle_line(tasks, line)
print('Client disconnected')
async def run(terminating):
# initial tasks
tasks = {
id: aio.ensure_future(wait_for_alert(id, created_at))
for id, created_at in initial_alerts.items()}
clients = []
# start control server
bound_handle_client = partial(handle_client, clients, tasks)
print('ncat 127.0.0.1 9999')
server = await aio.start_server(bound_handle_client, host='127.0.0.1', port=9999)
await terminating.wait()
server.close()
# stop all alert tasks
for task in tasks.values():
task.cancel()
# disconnect all clients
for client in clients:
client.cancel()
await aio.gather(*[*tasks.values(), *clients, server.wait_closed()])
def main():
print('kill -TERM', os.getpid())
loop = aio.get_event_loop()
terminating = aio.Event()
loop.add_signal_handler(signal.SIGTERM, handle_terminate, terminating)
loop.run_until_complete(run(terminating))
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment