Last active
February 19, 2024 15:52
-
-
Save bstaletic/4dc95922c39e045357f6972b2a695783 to your computer and use it in GitHub Desktop.
The bare minimum needed for supporting YouCompleteMe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: Handle headers that do not exist. | |
# TODO: Documentation. | |
# TODO: Type hints | |
import http.client | |
import json | |
import sys | |
import traceback | |
_MEMFILE_MAX = 10 * 1024 * 1024 | |
_WSGI_HEADERS = { 'CONTENT_LENGTH', | |
'CONTENT_TYPE', | |
'REQUEST_METHOD', | |
'SCRIPT_NAME', | |
'PATH_INFO', | |
'QUERY_STRING', | |
'SERVER_NAME', | |
'SERVER_PROTOCOL', | |
'SERVER_PORT' } | |
class HTTPError( Exception ): | |
def __init__( self, | |
status = 500, | |
body = '', | |
exception = None, | |
traceback = None ): | |
self.status = status | |
self.body = body | |
self.exception = exception, | |
self.traceback = traceback | |
class Response: | |
def __init__( self ): | |
self.headers = [] | |
def set_header( self, name : str, value : str ): | |
self._headers.append( ( name.title().replace( '_', '-' ), value ) ) | |
def _FixRequestHeaderName( key ): | |
key = key.replace( '-','_' ).upper() | |
if key in _WSGI_HEADERS: | |
return key | |
return 'HTTP_' + key | |
class Query: | |
def __init__( self, query_str : str ): | |
self._query = {} | |
for chunk in query_str.split( '&' ): | |
name, value = chunk.split( '=' ) | |
self._query[ name ] = value | |
def __getattribute__( self, query_name : str ): | |
return self._query[ query_name ] | |
class RequestHeaders( dict ): | |
def __init__( self, env ): | |
super().__init__( env ) | |
def __getitem__( self, key : str ): | |
key = _FixRequestHeaderName( key ) | |
return super()[ key ] | |
class Request: | |
def __init__( self, env ): | |
self.env = env | |
self.query = Query( env[ 'QUERY_STRING' ] ) | |
self.headers = RequestHeaders( env ) | |
@property | |
def json( self ): | |
return json.load( self.env[ 'wsgi.input' ] ) | |
@property | |
def body( self ): | |
return self.env[ 'wsgi.input' ] | |
def _ApplyPlugins( plugins, callback ): | |
for plugin in plugins: | |
callback = plugin( callback ) | |
return callback | |
class AppProducer: | |
def __init__( self, plugins ): | |
self.plugins = plugins | |
self.get_routes = {} | |
self.post_routes = {} | |
def ErrorHandler( self, http_error : HTTPError ): | |
raise RuntimeError( "Should be replaced by application." ) | |
def SetErrorHandler( self, error_handler ): | |
self.ErrorHandler = error_handler | |
def post( self, path : str ): | |
return self.AddRouteDecorator( path, 'POST' ) | |
def get( self, path : str ): | |
return self.AddRouteDecorator( path, 'GET' ) | |
def AddRouteDecorator( self, path : str, method : str ): | |
def Decorator( Callback ): | |
self.AddPath( path, method, Callback ) | |
return Callback | |
return Decorator | |
def AddPath( self, path : str, method : str, callback ): | |
callback = _ApplyPlugins( self.plugins, callback ) | |
if method == 'POST': | |
self.get_routes[path] = callback | |
else: | |
self.post_routes[path] = callback | |
def __call__( self, environ, start_response ): | |
try: | |
status = '200 OK' | |
request = Request( environ ) | |
response = Response() | |
if environ[ 'CONTENT_LENGTH' ] > _MEMFILE_MAX: | |
raise HTTPError( 413, 'Request too large' ) | |
try: | |
if environ[ 'REQUEST_METHOD' ] == 'POST': | |
out = self.post_routes[ environ[ 'PATH_INFO' ] ]( request, response ) | |
else: | |
out = self.get_routes[ environ[ 'PATH_INFO' ] ]( request, response ) | |
except HTTPError: | |
raise | |
except Exception: | |
e = HTTPError( 500, | |
'Internal server error', | |
sys.exc_info()[ 1 ], | |
traceback.format_exc() ) | |
response = Response() | |
out = self.ErrorHandler( e, response ) | |
response_headers = response.headers | |
response_headers.append( ( 'Content-Length', str( len( out ) ) ) ) | |
start_response( status, response_headers ) | |
return out.encode( 'utf-8' ) | |
except HTTPError as e: | |
status = http.client.responses[ e.status ] | |
out = e.body.encode( 'utf-8' ) | |
response_headers = [ ( 'Content-Length', str( len( out ) ) ) ] | |
start_response( status, response_headers, sys.exc_info() ) | |
return out | |
def abort( code, text ): | |
raise HTTPError( code, text ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment