Created
November 11, 2018 21:15
-
-
Save diogommartins/b6bc949dbc350c0d32ba041ca608134e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import List | |
from aiohttp import web | |
from healthchecker import check | |
from asyncworker import App, Options | |
from asyncworker.rabbitmq import RabbitMQMessage | |
from healthchecker.async_checker import AsyncCheckCase | |
app = App(host="localhost", user="guest", password="guest", prefetch_count=1024) | |
@app.http_route('GET', '/') | |
async def index(request: web.Request) -> web.Response: | |
return web.Response(body="Hello world") | |
@app.http_route('GET', '/healthcheck') | |
class HealthCheck(AsyncCheckCase, web.View): | |
@property | |
def loop(self) -> asyncio.AbstractEventLoop: | |
return self.request.app.loop | |
async def get(self) -> 'web.Response': | |
""" | |
Should return 200 if all dependencies are ok, 500 otherwise. | |
:returns: A HTTP response with True or False for each check | |
""" | |
await self.check() | |
status_code = 200 if self.has_succeeded() else 500 | |
return web.json_response(data=self.check_report, status=status_code) | |
@check | |
async def validate_xablau(self): | |
return True | |
@check | |
async def validate_xena(self): | |
return False | |
@app.route(["words_to_index"], vhost="/", options={Options.BULK_SIZE: 512}) | |
async def drain_handler(messages: List[RabbitMQMessage]): | |
print(messages) | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment