Last active
May 25, 2023 20:34
-
-
Save an01f01/4b6f1dd0c62bb922a2941c303616d083 to your computer and use it in GitHub Desktop.
Base tornado 6.3 app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import signal | |
import asyncio | |
from typing import Optional, Awaitable | |
import tornado.httpserver | |
import tornado.ioloop | |
import tornado.options | |
import tornado.web | |
WEBHOOK_SECRET = "setwebhook" | |
WEBHOOK_PORT = 8000 | |
tornado.options.define("port", default=WEBHOOK_PORT, help="run on the given port", type=int) | |
class BaseHandler(tornado.web.RequestHandler): | |
""" | |
Base handler gonna to be used instead of RequestHandler | |
""" | |
def write_error(self, status_code, **kwargs): | |
if status_code in [403, 404, 500, 503]: | |
self.write('Error %s' % status_code) | |
else: | |
self.write('BOOM!') | |
class ErrorHandler(tornado.web.ErrorHandler, BaseHandler): | |
""" | |
Default handler gonna to be used in case of 404 error | |
""" | |
pass | |
class MainHandler(BaseHandler): | |
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: | |
pass | |
def get(self): | |
self.write("Tornado v6.3 app is running!") | |
self.finish() | |
class WebhookServ(BaseHandler): | |
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: | |
pass | |
def get(self): | |
self.write("Webhook server handler") | |
self.finish() | |
def post(self): | |
if "Content-Length" in self.request.headers and \ | |
"Content-Type" in self.request.headers and \ | |
self.request.headers['Content-Type'] == "application/json": | |
# length = int(self.request.headers['Content-Length']) | |
json_data = self.request.body.decode("utf-8") | |
self.write(json_data) | |
self.finish() | |
else: | |
self.write("What are you doing here?") | |
self.finish() | |
def make_app(): | |
settings = dict( | |
cookie_secret=str(os.urandom(45)), | |
template_path=os.path.join(os.path.dirname(__file__), "templates"), | |
static_path=os.path.join(os.path.dirname(__file__), "static"), | |
default_handler_class=ErrorHandler, | |
default_handler_args=dict(status_code=404) | |
) | |
return tornado.web.Application([ | |
(r"/", MainHandler), | |
(r"/" + WEBHOOK_SECRET, WebhookServ) | |
], **settings) | |
async def main(): | |
print("starting tornado server..........") | |
app = make_app() | |
app.listen(tornado.options.options.port) | |
await asyncio.Event().wait() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment