Skip to content

Instantly share code, notes, and snippets.

@pyhedgehog
Last active July 14, 2016 13:48
Show Gist options
  • Save pyhedgehog/2f3829f0e2f1738e62871c5fcc4d8427 to your computer and use it in GitHub Desktop.
Save pyhedgehog/2f3829f0e2f1738e62871c5fcc4d8427 to your computer and use it in GitHub Desktop.
# curl --data-urlencode source@- https://hook.io/marak/gateway-python <0hook.py
from __future__ import print_function
import os
import sys
import json
import pprint
import wsgiref.handlers
default_hook_path = '/pyhedgehog/issuestats-github'
def app(environ, start_response):
start_response('200 OK', [('content-type', 'text/plain')])
for k in Hook['env']:
if k in environ:
del environ[k]
if 'hook_private_key' in environ:
del environ['hook_private_key']
res = ['== wsgi environment']
res.extend('%s=%s'%(k,v) for k,v in sorted(environ.items()))
res.append('')
res.append('== os environment')
res.extend('%s=%s'%(k,v) for k,v in sorted(os.environ.items()))
res.append('')
if 'hookAccessKey' in Hook:
del Hook['hookAccessKey']
if 'hookAccessKey' in Hook['env']:
del Hook['env']['hookAccessKey']
if 'env' in Hook:
Hook['env'].clear()
res.append('== Hook')
res.append(pprint.pformat(Hook))
return str('\n'.join(res))
class HookIOHandler(wsgiref.handlers.CGIHandler):
def __init__(self):
wsgiref.handlers.BaseCGIHandler.__init__(
self, sys.stdin, sys.stdout, sys.stderr, {},
multithread=False, multiprocess=True
)
def send_headers(self):
self.cleanup_headers()
self.headers_sent = True
head = {'type': 'writeHead', 'payload': {'code': self.status, 'headers': dict(self.headers)}}
sys.stderr.write(json.dumps(head)+'\n')
sys.stderr.flush()
def add_cgi_vars(self):
#assert not Hook['isStreaming'], 'Streaming hooks not yet supported by WSGI gateway'
self.environ.update(Hook['env'])
self.environ['hook_private_key'] = Hook['hookAccessKey']
self.environ['SERVER_NAME'] = Hook['req']['host']
self.environ['GATEWAY_INTERFACE'] = 'CGI/1.1'
self.environ['SERVER_PORT'] = '443'
self.environ['REMOTE_ADDR'] = self.environ['REMOTE_HOST'] = Hook['req']['connection']['remoteAddress']
self.environ['CONTENT_LENGTH'] = ''
self.environ['SCRIPT_NAME'] = ''
self.environ['SERVER_PROTOCOL'] = 'HTTP/1.0'
self.environ['REQUEST_METHOD'] = Hook['req']['method']
self.environ['PATH_INFO'] = Hook['req']['path']
self.environ['QUERY_STRING'] = Hook['req']['url'][len(Hook['req']['path'])+1:]
self.environ['CONTENT_TYPE'] = Hook['req']['headers'].get('content-type', '')
if 'content-length' in Hook['req']['headers']:
self.environ['CONTENT_LENGTH'] = Hook['req']['headers']['content-length']
for k,v in Hook['req']['headers'].items():
k = k.replace('-', '_').upper()
v = v.strip()
if k in self.environ:
continue
self.environ['HTTP_'+k] = v
wsgiref.util.shift_path_info(self.environ) # user
wsgiref.util.shift_path_info(self.environ) # hook
def main():
HookIOHandler().run(app)
if __name__=='__main__':
main()
#!/usr/bin/env python
import os
import sys
import argparse
import wsgiref.handlers
import wsgiref.simple_server
from six.moves.urllib.parse import urlsplit, parse_qsl
def main(argv=None):
argv = argv or sys.argv
p = argparse.ArgumentParser(prog=argv[0], description='Run (specially crafted) python hook as a server or CGI')
p.add_argument('-p', '--port', default=4000, help='Port of server (4000)')
p.add_argument('-l', '--listen', default='127.0.0.1', help='Port of server (127.0.0.1)')
p.add_argument('-s', '--server', action='store_true', help='Run as HTTP server')
p.add_argument('-c', '--cgi', action='store_true', help='Run as CGI')
p.add_argument('-u', '--uri', default='', help='Debug hook with given URI (relative to base hook path)')
p.add_argument('-m', '--module', default='0hook', help='Module to import')
p.add_argument('-H', '--hook', default='', help='Path to hook in form /owner/name')
p.add_argument('-a', '--anonymous', action='store_true', help='Run as anonymous')
p.add_argument('-g', '--gist', default=None, help='Gist url')
args = p.parse_args(argv[1:])
assert not (args.server and args.cgi and args.uri), "Can't run in several modes at once (choose URI-debug, CGI or server)"
mod = __import__(args.module)
if args.cgi:
cgi_app = get_app(mod, args)
return wsgiref.handlers.CGIHandler().run(cgi_app)
if args.server:
cgi_app = get_app(mod, args)
wsgiref.simple_server.ServerHandler.os_environ = {}
httpd = wsgiref.simple_server.make_server(args.listen, args.port, cgi_app)
host, port = httpd.socket.getsockname()
print("Serving http://%s:%d/" % (host, port))
return httpd.serve_forever()
patch_hook(mod, args.hook, args.uri, args.gist, args.anonymous)
return mod.main()
def get_app(mod, args):
def cgi_app(environ, start_response):
uri = environ.get('PATH_INFO', '')
if uri == '/favicon.ico':
start_response('404 Not found', [])
return ''
qs = environ.get('QUERY_STRING', '')
if qs:
uri += '?' + qs
#print('cgi_app: url='+repr(uri))
patch_hook(mod, args.hook, uri, args.gist, args.anonymous)
return mod.app(environ, start_response)
return cgi_app
def patch_hook(mod, hook=None, uri=None, gist=None, anon=False):
hook = hook or mod.default_hook_path
owner, name = hook.strip('/').split('/', 1)
uri = uri or ''
path, qs = urlsplit(uri)[2:4]
params = dict(parse_qsl(qs))
mod.Hook = Hook = dict(
env={},
hookAccessKey=None,
req=dict(
connection={'remoteAddress':'127.0.0.1'},
method='GET',
url=hook+uri,
path=hook+path,
host='hook.io',
params={
'owner': owner,
'hook': name,
'0': path,
},
headers={
'accept': 'text/html,text/plain',
'accept-language': 'en',
'connection': 'keep-alive',
'dnt': '1',
'host': 'hook.io',
'transfer-encoding': 'chunked',
'user-agent': 'Mozilla Compatible (Some OS)',
'x-forwarded-for': '::ffff:123.456.78.90',
'x-hookio-user-session-name': ['anonymous',owner][not anon],
},
),
params=params,
)
if gist:
Hook['params']['gist'] = Hook['req']['params']['gist'] = gist
Hook['input'] = Hook['req']
if __name__=='__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment