-
-
Save pyhedgehog/2f3829f0e2f1738e62871c5fcc4d8427 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# curl --data-urlencode source@- https://hook.io/marak/gateway-python <0hook.py | |
from __future__ import print_function | |
import os | |
import sys | |
import json | |
import pprint | |
import wsgiref.handlers | |
default_hook_path = '/pyhedgehog/issuestats-github' | |
def app(environ, start_response): | |
start_response('200 OK', [('content-type', 'text/plain')]) | |
for k in Hook['env']: | |
if k in environ: | |
del environ[k] | |
if 'hook_private_key' in environ: | |
del environ['hook_private_key'] | |
res = ['== wsgi environment'] | |
res.extend('%s=%s'%(k,v) for k,v in sorted(environ.items())) | |
res.append('') | |
res.append('== os environment') | |
res.extend('%s=%s'%(k,v) for k,v in sorted(os.environ.items())) | |
res.append('') | |
if 'hookAccessKey' in Hook: | |
del Hook['hookAccessKey'] | |
if 'hookAccessKey' in Hook['env']: | |
del Hook['env']['hookAccessKey'] | |
if 'env' in Hook: | |
Hook['env'].clear() | |
res.append('== Hook') | |
res.append(pprint.pformat(Hook)) | |
return str('\n'.join(res)) | |
class HookIOHandler(wsgiref.handlers.CGIHandler): | |
def __init__(self): | |
wsgiref.handlers.BaseCGIHandler.__init__( | |
self, sys.stdin, sys.stdout, sys.stderr, {}, | |
multithread=False, multiprocess=True | |
) | |
def send_headers(self): | |
self.cleanup_headers() | |
self.headers_sent = True | |
head = {'type': 'writeHead', 'payload': {'code': self.status, 'headers': dict(self.headers)}} | |
sys.stderr.write(json.dumps(head)+'\n') | |
sys.stderr.flush() | |
def add_cgi_vars(self): | |
#assert not Hook['isStreaming'], 'Streaming hooks not yet supported by WSGI gateway' | |
self.environ.update(Hook['env']) | |
self.environ['hook_private_key'] = Hook['hookAccessKey'] | |
self.environ['SERVER_NAME'] = Hook['req']['host'] | |
self.environ['GATEWAY_INTERFACE'] = 'CGI/1.1' | |
self.environ['SERVER_PORT'] = '443' | |
self.environ['REMOTE_ADDR'] = self.environ['REMOTE_HOST'] = Hook['req']['connection']['remoteAddress'] | |
self.environ['CONTENT_LENGTH'] = '' | |
self.environ['SCRIPT_NAME'] = '' | |
self.environ['SERVER_PROTOCOL'] = 'HTTP/1.0' | |
self.environ['REQUEST_METHOD'] = Hook['req']['method'] | |
self.environ['PATH_INFO'] = Hook['req']['path'] | |
self.environ['QUERY_STRING'] = Hook['req']['url'][len(Hook['req']['path'])+1:] | |
self.environ['CONTENT_TYPE'] = Hook['req']['headers'].get('content-type', '') | |
if 'content-length' in Hook['req']['headers']: | |
self.environ['CONTENT_LENGTH'] = Hook['req']['headers']['content-length'] | |
for k,v in Hook['req']['headers'].items(): | |
k = k.replace('-', '_').upper() | |
v = v.strip() | |
if k in self.environ: | |
continue | |
self.environ['HTTP_'+k] = v | |
wsgiref.util.shift_path_info(self.environ) # user | |
wsgiref.util.shift_path_info(self.environ) # hook | |
def main(): | |
HookIOHandler().run(app) | |
if __name__=='__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import argparse | |
import wsgiref.handlers | |
import wsgiref.simple_server | |
from six.moves.urllib.parse import urlsplit, parse_qsl | |
def main(argv=None): | |
argv = argv or sys.argv | |
p = argparse.ArgumentParser(prog=argv[0], description='Run (specially crafted) python hook as a server or CGI') | |
p.add_argument('-p', '--port', default=4000, help='Port of server (4000)') | |
p.add_argument('-l', '--listen', default='127.0.0.1', help='Port of server (127.0.0.1)') | |
p.add_argument('-s', '--server', action='store_true', help='Run as HTTP server') | |
p.add_argument('-c', '--cgi', action='store_true', help='Run as CGI') | |
p.add_argument('-u', '--uri', default='', help='Debug hook with given URI (relative to base hook path)') | |
p.add_argument('-m', '--module', default='0hook', help='Module to import') | |
p.add_argument('-H', '--hook', default='', help='Path to hook in form /owner/name') | |
p.add_argument('-a', '--anonymous', action='store_true', help='Run as anonymous') | |
p.add_argument('-g', '--gist', default=None, help='Gist url') | |
args = p.parse_args(argv[1:]) | |
assert not (args.server and args.cgi and args.uri), "Can't run in several modes at once (choose URI-debug, CGI or server)" | |
mod = __import__(args.module) | |
if args.cgi: | |
cgi_app = get_app(mod, args) | |
return wsgiref.handlers.CGIHandler().run(cgi_app) | |
if args.server: | |
cgi_app = get_app(mod, args) | |
wsgiref.simple_server.ServerHandler.os_environ = {} | |
httpd = wsgiref.simple_server.make_server(args.listen, args.port, cgi_app) | |
host, port = httpd.socket.getsockname() | |
print("Serving http://%s:%d/" % (host, port)) | |
return httpd.serve_forever() | |
patch_hook(mod, args.hook, args.uri, args.gist, args.anonymous) | |
return mod.main() | |
def get_app(mod, args): | |
def cgi_app(environ, start_response): | |
uri = environ.get('PATH_INFO', '') | |
if uri == '/favicon.ico': | |
start_response('404 Not found', []) | |
return '' | |
qs = environ.get('QUERY_STRING', '') | |
if qs: | |
uri += '?' + qs | |
#print('cgi_app: url='+repr(uri)) | |
patch_hook(mod, args.hook, uri, args.gist, args.anonymous) | |
return mod.app(environ, start_response) | |
return cgi_app | |
def patch_hook(mod, hook=None, uri=None, gist=None, anon=False): | |
hook = hook or mod.default_hook_path | |
owner, name = hook.strip('/').split('/', 1) | |
uri = uri or '' | |
path, qs = urlsplit(uri)[2:4] | |
params = dict(parse_qsl(qs)) | |
mod.Hook = Hook = dict( | |
env={}, | |
hookAccessKey=None, | |
req=dict( | |
connection={'remoteAddress':'127.0.0.1'}, | |
method='GET', | |
url=hook+uri, | |
path=hook+path, | |
host='hook.io', | |
params={ | |
'owner': owner, | |
'hook': name, | |
'0': path, | |
}, | |
headers={ | |
'accept': 'text/html,text/plain', | |
'accept-language': 'en', | |
'connection': 'keep-alive', | |
'dnt': '1', | |
'host': 'hook.io', | |
'transfer-encoding': 'chunked', | |
'user-agent': 'Mozilla Compatible (Some OS)', | |
'x-forwarded-for': '::ffff:123.456.78.90', | |
'x-hookio-user-session-name': ['anonymous',owner][not anon], | |
}, | |
), | |
params=params, | |
) | |
if gist: | |
Hook['params']['gist'] = Hook['req']['params']['gist'] = gist | |
Hook['input'] = Hook['req'] | |
if __name__=='__main__': | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment