Last active
March 4, 2021 18:25
-
-
Save emrahgunduz/1c8c7c87981e5dffb11d5d780b78191b to your computer and use it in GitHub Desktop.
Python - Create a simple health check server (async)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import threading | |
from aiohttp import web | |
def healthcheck_response ( data, *, text=None, body=None, status=200, reason=None, headers=None, content_type="application/json", dumps=json.dumps ): | |
return web.json_response( | |
data, | |
text=text, | |
body=body, | |
status=status, | |
reason=reason, | |
headers=headers, | |
content_type=content_type, | |
dumps=dumps, | |
) | |
def aiohttp_server ( handler ): | |
app = web.Application() | |
app.add_routes( [ web.get( '/', handler ) ] ) | |
runner = web.AppRunner( app ) | |
return runner | |
def run_server ( runner, host, port ): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop( loop ) | |
loop.run_until_complete( runner.setup() ) | |
site = web.TCPSite( runner, host, port ) | |
loop.run_until_complete( site.start() ) | |
print( f'Health check server is up and running http://{host}:{port}' ) | |
loop.run_forever() | |
async def default_handler (): | |
response_content = {'status': 'OK'} | |
headers = {'server': 'Health Check Service/1.0'} | |
return healthcheck_response( response_content, status=200, headers=headers ) | |
def start_healthcheck_server ( handler=None, host="0.0.0.0", port=8082 ): | |
print( 'Starting health check server' ) | |
server = aiohttp_server( handler if handler is not None else default_handler ) | |
http_thread = threading.Thread( target=run_server, args=(server, host, port,) ) | |
http_thread.daemon = True | |
http_thread.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from health_check import start_healthcheck_server, healthcheck_response | |
async def custom_handler ( _ ): | |
response_content = {'status': 'OK'} | |
headers = {'server': 'My Service/1.0'} | |
return healthcheck_response( response_content, status=200, headers=headers ) | |
default_host = '127.0.0.1' | |
default_hc_port = 8000 | |
start_healthcheck_server( handler=custom_handler, host=default_host, port=default_hc_port ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment