Created
October 9, 2016 00:08
-
-
Save bencord0/fd531ada6c2ca87a972971de8a8c9d79 to your computer and use it in GitHub Desktop.
asyncio (dumb TCP) load balancer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
usage: lb.py [listening_port] [backend_host] [backend_port] | |
Only specify one backend host/port. It is expected that | |
other backends can be discovered by DNS at connect time. | |
Useful if, say, your backends are on fully dynamic addresses, | |
but you have a sensible DNS server in front of them. | |
""" | |
import argparse | |
import asyncio | |
import concurrent.futures | |
import logging | |
import signal | |
import socket | |
loop = asyncio.get_event_loop() | |
logging.getLogger('asyncio').setLevel(logging.DEBUG) | |
heartbeat = asyncio.Event() | |
parser = argparse.ArgumentParser() | |
parser.add_argument('port') | |
parser.add_argument('backend_host') | |
parser.add_argument('backend_port') | |
def make_server(): | |
args = parser.parse_args() | |
print(args) | |
def ping_backend(): | |
"""A heartbeat thread that kills connections violently on failure""" | |
while True: | |
try: | |
# Drive the loop by the heartbeat event timeout | |
yield from asyncio.wait_for(heartbeat.wait(), 5) | |
break | |
except asyncio.TimeoutError: | |
try: | |
# Simple DNS/TCP open check | |
r, w = yield from asyncio.open_connection( | |
args.backend_host, args.backend_port) | |
except socket.gaierror: | |
# Docker name resolution or service discovery failure | |
print("Backend could not be found") | |
break | |
w.close() | |
except (OSError, Exception): | |
logger.exception("Backend is not healthy") | |
break | |
loop.stop() | |
asyncio.async(ping_backend()) | |
# This callback is used on incomming TCP connections to the frontend port | |
def handle_connection(frontend_reader, frontend_writer): | |
# Establish a new TCP connection to the backend | |
try: | |
backend_reader, backend_writer = yield from asyncio.open_connection( | |
args.backend_host, args.backend_port) | |
except socket.gaierror: | |
# Can't find the backend, service discover failure? | |
# Don't hang the client. | |
frontend_writer.close() | |
return | |
print("{} -> {}" | |
.format(frontend_writer.transport.get_extra_info('peername'), | |
backend_writer.transport.get_extra_info('peername'))) | |
# Be a transparent TCP proxy | |
def proxy_bytes(reader, writer): | |
while True: | |
try: | |
buf = yield from asyncio.wait_for(reader.read(1024), 1) | |
if not buf: | |
break | |
writer.write(buf) | |
except asyncio.TimeoutError: | |
if heartbeat.is_set(): | |
print("Closing proxy on shutdown") | |
break | |
except (KeyboardInterrupt, Exception): | |
print("Closing proxy") | |
yield from writer.drain() | |
writer.close() | |
break | |
asyncio.async(proxy_bytes(frontend_reader, backend_writer)) | |
asyncio.async(proxy_bytes(backend_reader, frontend_writer)) | |
# Who says callbacks aren't fun. Register this server in the event loop. | |
server = yield from asyncio.start_server(handle_connection, port=args.port) | |
print("Started server:", [s.getsockname() for s in server.sockets]) | |
return server | |
if __name__ == '__main__': | |
server = loop.run_until_complete(make_server()) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
print("Shutting down proxy") | |
server.close() # no longer accepting connections | |
# Shutdown phase, stop heartbeating and wait for clients to flush. | |
heartbeat.set() | |
loop.run_until_complete(server.wait_closed()) | |
# Raise CancelledError in all scheduled coroutines. | |
# run_until_complete propagates them outside the event loop | |
for task in asyncio.Task.all_tasks(): | |
task.cancel() | |
try: | |
# Give the coroutine time on the event loop, | |
# maybe they handle the CancelledError? | |
loop.run_until_complete(task) | |
except concurrent.futures.CancelledError: | |
pass # In case they don't | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment