Skip to content

Instantly share code, notes, and snippets.

@ssadler
Last active December 20, 2016 17:00
Show Gist options
  • Save ssadler/dc6096a580bb80671080 to your computer and use it in GitHub Desktop.
Save ssadler/dc6096a580bb80671080 to your computer and use it in GitHub Desktop.
http expression
#!/usr/bin/env python
main = lambda view, port, host='0.0.0.0', recvlen=4096, recvtimeout=0.0: (
(lambda sys, socket, itertools, urllib, re, cStringIO, pprint, datetime, types, select, traceback, BaseHTTPServer:
(lambda _try, recursive:
(lambda HttpRequest, HttpResponse:
(lambda conn2buf, serverloop, log_hit, errorview:
(lambda sock, handler:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) or
sock.bind((host, port)) or sock.listen(1) or
sys.stdout.write('HttpExpression listening on %s:%s\n' % (host, port)) or
serverloop(handler, sock)
)(socket.socket(),
# handler
(lambda conn, addr:
(lambda request:
(lambda response: log_hit(request, response) or conn.send(str(response)) or
sock.shutdown(socket.SHUT_RDWR) or sock.close()
)(_try(lambda: view(request, HttpResponse), lambda e: errorview(request, HttpResponse)))
)(HttpRequest(addr, conn2buf(conn, cStringIO.StringIO())))))
)( # conn2buf
(lambda conn, buf:
recursive((lambda f, chunk:
chunk and (
buf.write(chunk) or
(conn in select.select([conn], [], [], recvtimeout)[0]) and f(f, conn.recv(recvlen))) or
buf.seek(0) or buf))(conn.recv(recvlen))),
# serverloop
(lambda handler, sock:
_try((lambda: max(handler(*sock.accept()) or i+1 for i in itertools.count())),
None, (lambda: sock.shutdown(socket.SHUT_RDWR) or sock.close()))),
# log_hit
(lambda request, response:
sys.stdout.write('%s [%s] "%s" %s %sb\n' % (request.remote_addr[0], datetime.datetime.now(),
request.rawheaders[0], response.status_code,
len(response)))),
# errorview
(lambda request, HttpResponse: HttpResponse(str(traceback.format_exc()+"\n\n\n"+str(request)),
status_code=500, content_type='text/plain'))
)
)( # HTTPRequest
type('HTTPRequest', (), {
'server_host': socket.gethostname(),
'__init__': lambda self, addr, request_file:
setattr(self, 'remote_addr', addr) or
# raw headers
setattr(self, 'rawheaders',
[l[:-2] for l in itertools.takewhile(lambda l: l.strip(), request_file)]) or
# parse info from first line of headers
map(lambda *i: setattr(self, *i), ['method', 'path', 'query_string', 'http_version'],
re.match('^(\w+)\s+([^\?]*)\??(.*)\s+HTTP/(1\.\d)$', self.rawheaders[0]).groups()) and
# parse rest of headers
setattr(self, 'headers', dict(map(lambda l: map(str.lstrip, l.split(':', 1)),
self.rawheaders[1:]))) or
# parse get and post at the same time
map(lambda n, v: setattr(self, n, v), ['GET', 'POST'],
map((lambda qs: qs and dict(tuple(None if i==None else urllib.unquote_plus(i) for i in urllib.splitvalue(v)) for v in qs.split('&')) or {}),
[self.query_string, request_file.read()]))[0],
'__str__': lambda self: ('<HttpRequest\n\n%s\n\nGET: %s\nPOST: %s\n>' %
(('\n'.join(self.rawheaders),) + tuple(map(pprint.pformat, [self.GET, self.POST])))),
}),
# HTTPResponse
type('HTTPResponse', (), {
'response_messages': {200: ('OK', 'Request fulfilled, document follows'), 201: ('Created', 'Document created, URL follows'), 202: ('Accepted', 'Request accepted, processing continues off-line'), 203: ('Non-Authoritative Information', 'Request fulfilled from cache'), 204: ('No Content', 'Request fulfilled, nothing follows'), 205: ('Reset Content', 'Clear input form for further input.'), 206: ('Partial Content', 'Partial content follows.'), 400: ('Bad Request', 'Bad request syntax or unsupported method'), 401: ('Unauthorized', 'No permission -- see authorization schemes'), 402: ('Payment Required', 'No payment -- see charging schemes'), 403: ('Forbidden', 'Request forbidden -- authorization will not help'), 404: ('Not Found', 'Nothing matches the given URI'), 405: ('Method Not Allowed', 'Specified method is invalid for this server.'), 406: ('Not Acceptable', 'URI not available in preferred format.'), 407: ('Proxy Authentication Required', 'You must authenticate with this proxy before proceeding.'), 408: ('Request Timeout', 'Request timed out; try again later.'), 409: ('Conflict', 'Request conflict.'), 410: ('Gone', 'URI no longer exists and has been permanently removed.'), 411: ('Length Required', 'Client must specify Content-Length.'), 412: ('Precondition Failed', 'Precondition in headers is false.'), 413: ('Request Entity Too Large', 'Entity is too large.'), 414: ('Request-URI Too Long', 'URI is too long.'), 415: ('Unsupported Media Type', 'Entity body in unsupported format.'), 416: ('Requested Range Not Satisfiable', 'Cannot satisfy request range.'), 417: ('Expectation Failed', 'Expect condition could not be satisfied.'), 100: ('Continue', 'Request received, please continue'), 101: ('Switching Protocols', 'Switching to new protocol; obey Upgrade header'), 300: ('Multiple Choices', 'Object has several resources -- see URI list'), 301: ('Moved Permanently', 'Object moved permanently -- see URI list'), 302: ('Found', 'Object moved temporarily -- see URI list'), 303: ('See Other', 'Object moved -- see Method and URL list'), 304: ('Not Modified', 'Document has not changed since given time'), 305: ('Use Proxy', 'You must use proxy specified in Location to access this resource.'), 307: ('Temporary Redirect', 'Object moved temporarily -- see URI list'), 500: ('Internal Server Error', 'Server got itself in trouble'), 501: ('Not Implemented', 'Server does not support this operation'), 502: ('Bad Gateway', 'Invalid responses from another server/proxy.'), 503: ('Service Unavailable', 'The server cannot process the request due to a high load'), 504: ('Gateway Timeout', 'The gateway server did not receive a timely response'), 505: ('HTTP Version Not Supported', 'Cannot fulfill request.')},
'__init__': (lambda self, data='', status_code=200, content_type='text/html':
setattr(self, '_data', data) or
setattr(self, 'status_code', status_code) or
setattr(self, 'content_type', content_type) or
setattr(self, 'headers', {
'Server': 'HttpExpression',
'Connection': 'close',
'Content-Length': str(len(data)),
'Content-Type': content_type,
})
),
'__len__': lambda self: len(self._data),
'__str__': lambda self: 'HTTP/1.x %s %s\r\n%s\r\n\r\n%s' %
(self.status_code, BaseHTTPServer.BaseHTTPRequestHandler.responses[self.status_code][0],
'\r\n'.join(map(': '.join, self.headers.items())),
self._data),
}),
)
)( # exception handler - cheater!@!!
types.FunctionType(compile("""\
def _try(func, onexcept=None, onfinally=None, reraise=True):
try:
return func()
except (Exception, KeyboardInterrupt) as e:
r = onexcept(e) if onexcept else None
if r: return r
if reraise: raise
finally:
if onfinally: onfinally()""", __file__, 'exec').co_consts[0], globals(), '_try',
(lambda: None, None, None, True)),
# recursion helper
lambda f: (lambda *p, **kw: f(f, *p, **kw))
)
)(*map(__import__, ['sys', 'socket', 'itertools', 'urllib', 're', 'cStringIO', 'pprint', 'datetime', 'types', 'select', 'traceback', 'BaseHTTPServer']))
)
############### Views
from xml.sax import saxutils
from pygments import highlight
from pygments import lexers
from pygments.formatters import HtmlFormatter
pygment_lexers = ['<option value="%s">%s</option>' % (l[1][0], l[0]) for l in lexers.get_all_lexers()]
pygment_lexers = ''.join(sorted(pygment_lexers))
code_highlighter_form = """\
<form action="." method="post">
<textarea name="code" style="width:60%; height: 60%;"></textarea>
<div style="color: red;">{{ form_error }}</div>
<br />
<select name="lexer"><option value="">Guess</option>""" + pygment_lexers + """</select>
<br />
<input type="submit" value="submit" />
</form>"""
html_doc = """\
<html>
<head>
<style type="text/css">
%s
</style>
</head>
<body>
%s
</body>
</html>"""
html_formatter = HtmlFormatter(full=True, noclasses=True, linenos='table')
def code_highlighter(request, HttpResponse):
if request.method == "GET":
return HttpResponse(code_highlighter_form)
code = request.POST['code']
if request.POST['lexer']:
lexer = lexers.get_lexer_by_name(request.POST['lexer'])
else:
lexer = lexers.guess_lexer(code)
code = highlight(code, lexer, html_formatter)
htmldoc = html_doc % (html_formatter.get_style_defs('.highlight'), code)
return HttpResponse(htmldoc)
if __name__ == '__main__':
main(code_highlighter, int(__import__('sys').argv[1]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment