Skip to content

Instantly share code, notes, and snippets.

@bencord0
Created October 9, 2016 00:08
Show Gist options
  • Save bencord0/fd531ada6c2ca87a972971de8a8c9d79 to your computer and use it in GitHub Desktop.
Save bencord0/fd531ada6c2ca87a972971de8a8c9d79 to your computer and use it in GitHub Desktop.
asyncio (dumb TCP) load balancer
#!/usr/bin/env python3
"""
usage: lb.py [listening_port] [backend_host] [backend_port]
Only specify one backend host/port. It is expected that
other backends can be discovered by DNS at connect time.
Useful if, say, your backends are on fully dynamic addresses,
but you have a sensible DNS server in front of them.
"""
import argparse
import asyncio
import concurrent.futures
import logging
import signal
import socket
loop = asyncio.get_event_loop()
logging.getLogger('asyncio').setLevel(logging.DEBUG)
heartbeat = asyncio.Event()
parser = argparse.ArgumentParser()
parser.add_argument('port')
parser.add_argument('backend_host')
parser.add_argument('backend_port')
def make_server():
args = parser.parse_args()
print(args)
def ping_backend():
"""A heartbeat thread that kills connections violently on failure"""
while True:
try:
# Drive the loop by the heartbeat event timeout
yield from asyncio.wait_for(heartbeat.wait(), 5)
break
except asyncio.TimeoutError:
try:
# Simple DNS/TCP open check
r, w = yield from asyncio.open_connection(
args.backend_host, args.backend_port)
except socket.gaierror:
# Docker name resolution or service discovery failure
print("Backend could not be found")
break
w.close()
except (OSError, Exception):
logger.exception("Backend is not healthy")
break
loop.stop()
asyncio.async(ping_backend())
# This callback is used on incomming TCP connections to the frontend port
def handle_connection(frontend_reader, frontend_writer):
# Establish a new TCP connection to the backend
try:
backend_reader, backend_writer = yield from asyncio.open_connection(
args.backend_host, args.backend_port)
except socket.gaierror:
# Can't find the backend, service discover failure?
# Don't hang the client.
frontend_writer.close()
return
print("{} -> {}"
.format(frontend_writer.transport.get_extra_info('peername'),
backend_writer.transport.get_extra_info('peername')))
# Be a transparent TCP proxy
def proxy_bytes(reader, writer):
while True:
try:
buf = yield from asyncio.wait_for(reader.read(1024), 1)
if not buf:
break
writer.write(buf)
except asyncio.TimeoutError:
if heartbeat.is_set():
print("Closing proxy on shutdown")
break
except (KeyboardInterrupt, Exception):
print("Closing proxy")
yield from writer.drain()
writer.close()
break
asyncio.async(proxy_bytes(frontend_reader, backend_writer))
asyncio.async(proxy_bytes(backend_reader, frontend_writer))
# Who says callbacks aren't fun. Register this server in the event loop.
server = yield from asyncio.start_server(handle_connection, port=args.port)
print("Started server:", [s.getsockname() for s in server.sockets])
return server
if __name__ == '__main__':
server = loop.run_until_complete(make_server())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Shutting down proxy")
server.close() # no longer accepting connections
# Shutdown phase, stop heartbeating and wait for clients to flush.
heartbeat.set()
loop.run_until_complete(server.wait_closed())
# Raise CancelledError in all scheduled coroutines.
# run_until_complete propagates them outside the event loop
for task in asyncio.Task.all_tasks():
task.cancel()
try:
# Give the coroutine time on the event loop,
# maybe they handle the CancelledError?
loop.run_until_complete(task)
except concurrent.futures.CancelledError:
pass # In case they don't
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment