Skip to content

Instantly share code, notes, and snippets.

@emrahgunduz
Last active March 4, 2021 18:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emrahgunduz/1c8c7c87981e5dffb11d5d780b78191b to your computer and use it in GitHub Desktop.
Save emrahgunduz/1c8c7c87981e5dffb11d5d780b78191b to your computer and use it in GitHub Desktop.
Python - Create a simple health check server (async)
import asyncio
import json
import threading
from aiohttp import web
def healthcheck_response ( data, *, text=None, body=None, status=200, reason=None, headers=None, content_type="application/json", dumps=json.dumps ):
return web.json_response(
data,
text=text,
body=body,
status=status,
reason=reason,
headers=headers,
content_type=content_type,
dumps=dumps,
)
def aiohttp_server ( handler ):
app = web.Application()
app.add_routes( [ web.get( '/', handler ) ] )
runner = web.AppRunner( app )
return runner
def run_server ( runner, host, port ):
loop = asyncio.new_event_loop()
asyncio.set_event_loop( loop )
loop.run_until_complete( runner.setup() )
site = web.TCPSite( runner, host, port )
loop.run_until_complete( site.start() )
print( f'Health check server is up and running http://{host}:{port}' )
loop.run_forever()
async def default_handler ():
response_content = {'status': 'OK'}
headers = {'server': 'Health Check Service/1.0'}
return healthcheck_response( response_content, status=200, headers=headers )
def start_healthcheck_server ( handler=None, host="0.0.0.0", port=8082 ):
print( 'Starting health check server' )
server = aiohttp_server( handler if handler is not None else default_handler )
http_thread = threading.Thread( target=run_server, args=(server, host, port,) )
http_thread.daemon = True
http_thread.start()
from health_check import start_healthcheck_server, healthcheck_response
async def custom_handler ( _ ):
response_content = {'status': 'OK'}
headers = {'server': 'My Service/1.0'}
return healthcheck_response( response_content, status=200, headers=headers )
default_host = '127.0.0.1'
default_hc_port = 8000
start_healthcheck_server( handler=custom_handler, host=default_host, port=default_hc_port )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment