Skip to content

Instantly share code, notes, and snippets.

@an01f01
Last active May 25, 2023 20:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save an01f01/4b6f1dd0c62bb922a2941c303616d083 to your computer and use it in GitHub Desktop.
Save an01f01/4b6f1dd0c62bb922a2941c303616d083 to your computer and use it in GitHub Desktop.
Base tornado 6.3 app
import os
import signal
import asyncio
from typing import Optional, Awaitable
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
WEBHOOK_SECRET = "setwebhook"
WEBHOOK_PORT = 8000
tornado.options.define("port", default=WEBHOOK_PORT, help="run on the given port", type=int)
class BaseHandler(tornado.web.RequestHandler):
"""
Base handler gonna to be used instead of RequestHandler
"""
def write_error(self, status_code, **kwargs):
if status_code in [403, 404, 500, 503]:
self.write('Error %s' % status_code)
else:
self.write('BOOM!')
class ErrorHandler(tornado.web.ErrorHandler, BaseHandler):
"""
Default handler gonna to be used in case of 404 error
"""
pass
class MainHandler(BaseHandler):
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
pass
def get(self):
self.write("Tornado v6.3 app is running!")
self.finish()
class WebhookServ(BaseHandler):
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
pass
def get(self):
self.write("Webhook server handler")
self.finish()
def post(self):
if "Content-Length" in self.request.headers and \
"Content-Type" in self.request.headers and \
self.request.headers['Content-Type'] == "application/json":
# length = int(self.request.headers['Content-Length'])
json_data = self.request.body.decode("utf-8")
self.write(json_data)
self.finish()
else:
self.write("What are you doing here?")
self.finish()
def make_app():
settings = dict(
cookie_secret=str(os.urandom(45)),
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
default_handler_class=ErrorHandler,
default_handler_args=dict(status_code=404)
)
return tornado.web.Application([
(r"/", MainHandler),
(r"/" + WEBHOOK_SECRET, WebhookServ)
], **settings)
async def main():
print("starting tornado server..........")
app = make_app()
app.listen(tornado.options.options.port)
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment