Skip to content

Instantly share code, notes, and snippets.

@0xpizza
Last active November 7, 2023 00:59
Show Gist options
  • Save 0xpizza/3d63308a2baff93793fe03dcfeed8622 to your computer and use it in GitHub Desktop.
Save 0xpizza/3d63308a2baff93793fe03dcfeed8622 to your computer and use it in GitHub Desktop.
#coding:ascii
import argparse
import asyncio
import http.server
import logging
import ssl
import socket
import socketserver
import selectors
import threading
import warnings
from functools import partial
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s'
)
LOCALIP = socket.gethostbyname(socket.gethostname())
LOCALPORT = 8000
addl_ciphers = []
if not ssl.HAS_SSLv3:
addl_ciphers.append(':!SSLv3')
if not ssl.HAS_TLSv1:
addl_ciphers.append(':!TLS1.0')
if not ssl.HAS_TLSv1_1:
addl_ciphers.append(':!TLS1.1')
print(
'Your python installation does not support:',
', '.join(s.replace(':!','') for s in addl_ciphers)
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
INSECURE_CONTEXT = ssl.SSLContext()
INSECURE_CONTEXT.set_ciphers('ALL' + ''.join(addl_ciphers))
class ProxyProtocol(asyncio.Protocol):
def __init__(self, event_loop, connection_lock, linked_transport=None):
self.linked_transport = linked_transport
self.connection_lock = connection_lock
self.event_loop = event_loop
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.linked_transport.write(data)
def eof_received(self):
self.transport.close()
self.linked_transport.close()
self.event_loop.call_soon_threadsafe(self.connection_lock.clear)
class HttpProxyHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.event_loop = kwargs.pop('event_loop', None)
if not self.event_loop:
raise ValueError('Missing argument: `event_loop`')
super().__init__(*args, **kwargs)
self.do_HEAD = self.do_GET
self.do_POST = self.do_GET
def do_GET(self):
self.send_response(405, b'Method Not Allowed')
self.end_headers()
self.wfile.write(b'This server accepts CONNECT commands only. Configure this connection as an HTTP proxy endpoint to use the CONNECT command.')
self.wfile.flush()
def do_CONNECT(self):
try:
url, port = self.path.rsplit(':', 1)
port = int(port)
sock = socket.create_connection((url, port))
sock.setblocking(False)
self.sock_remote = sock
except Exception as e:
error = str(e).encode()
self.send_error(500, 'Internal Server Error')
self.send_header('Content-Length', len(str(e)))
self.end_headers()
self.wfile.write(str(e).encode())
print(e)
return
else:
self.send_response(200, 'OK')
self.event_loop.create_task(self.proxy_connection())
async def proxy_connection(self):
"""Transfer control of the sockets to async event loop"""
connection_lock = asyncio.Event()
factory = partial(ProxyProtocol, self.event_loop, connection_lock)
remote_transport, remote_protocol = await self.event_loop.create_connection(
factory, ssl=INSECURE_CONTEXT, sock=self.sock_remote
)
local_transport, local_protocol = await self.event_loop.create_connection(
factory, sock=self.socket
)
remote_protocol.linked_transport = local_transport
local_protocol.linked_transport = remote_transport
connection_lock.set()
# Pass control to the transport chain. Await RST or FIN.
await connection_lock.wait()
class HttpServer():
def __init__(self):
self.event_loop = asyncio.get_event_loop()
def run(self):
threading.Thread(
target=self.event_loop.run_forever,
daemon=True,
).start()
factory = partial(HttpProxyHandler, event_loop=self.event_loop)
server = socketserver.ThreadingTCPServer((LOCALIP, LOCALPORT), factory)
try:
print(f'Serving on: http://{LOCALIP}:{LOCALPORT}')
server.serve_forever()
except KeyboardInterrupt:
print('Shutting down...')
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.event_loop.stop()
self.event_loop.run_until_complete(self.event_loop.shutdown_asyncgens())
self.event_loop.close()
except (asyncio.CancelledError, StopIteration, RuntimeError):
pass
def main():
HttpServer().run()
print('Done.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment