Skip to content

Instantly share code, notes, and snippets.

@jart
Last active July 9, 2023 01:18
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jart/cd980c3f955f637400e5d9d14780bb70 to your computer and use it in GitHub Desktop.
Save jart/cd980c3f955f637400e5d9d14780bb70 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
import BaseHTTPServer
import SocketServer
import base64
import httplib
import mimetypes
import os
import shutil
import ssl
import stat
import subprocess
import sys
import tempfile
import textwrap
import urlparse
BIN = sys.argv[1]
CACHE = sys.argv[2]
CONTEXT = ssl.SSLContext(ssl.PROTOCOL_TLS)
CONTEXT.verify_mode = ssl.CERT_REQUIRED
CONTEXT.check_hostname = True
CONTEXT.load_default_certs(ssl.Purpose.CLIENT_AUTH)
def mkdir(path):
"""Makes directory with same behavior as mkdir -p."""
try:
os.makedirs(path)
except OSError as e:
if e.errno != 17: # File exists
raise
def run(command, quiet=None):
prog = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=quiet and subprocess.PIPE,
shell=True)
outs = prog.communicate()
return outs[0].rstrip() if prog.returncode == 0 else None
def hack_download_commands():
for cmd in ('wget', 'curl'):
original = run('command -v %s' % cmd)
if original is None:
continue
lol = '--no-hsts' if run('%s --no-hsts --version' % cmd, quiet=True) else ''
path = os.path.join(BIN, cmd)
with open(path, 'w') as fp:
fp.write(textwrap.dedent(r'''\
#!/bin/sh
cleared=
for x; do
test "$cleared" || set -- %s ; cleared=1
if [ $(expr -- "$x" : https://) -gt 0 ]; then
x="http${x#https}"
fi
set -- "$@" "$x"
done
exec '%s' "$@"
''' % (lol, original)))
os.chmod(path, os.stat(path).st_mode | stat.S_IEXEC)
def connect(url, lasturl=None):
scheme = url.scheme or lasturl.scheme
netloc = (url.netloc or lasturl.netloc).split(':')
if len(netloc) > 1:
netloc[1] = int(netloc[1])
if scheme == 'https':
c = httplib.HTTPSConnection(*netloc, context=CONTEXT)
else:
c = httplib.HTTPConnection(*netloc)
try:
pu = urlparse.ParseResult('', '', url.path, url.params, url.query,
url.fragment)
c.putrequest('GET', pu.geturl())
c.endheaders()
return c, c.getresponse()
except:
c.close()
raise
class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
def go(self):
if self.command != 'GET':
self.send_error(405, 'Proxy only supports GET')
return
ru = urlparse.urlparse(self.path)
if not ru.netloc or ru.scheme not in ('http', 'https'):
self.send_error(400, 'Proxy needs http[s]://netloc')
return
stash = ''
ntemp = None
stash = '%s/%s%s' % (CACHE, ru.netloc, ru.path)
if not os.path.realpath(stash).startswith(CACHE):
self.send_error(400, 'Evil path')
return
if os.path.exists(stash):
self.send_response(200)
self.send_header('Content-Type', (mimetypes.guess_type(stash)[0] or
'application/binary'))
self.send_header('Content-Length', os.path.getsize(stash))
self.end_headers()
with open(stash) as r:
shutil.copyfileobj(r, self.wfile)
self.wfile.flush()
return
mkdir(os.path.dirname(stash))
ntemp = tempfile.NamedTemporaryFile()
try:
c, r = connect(ru)
try:
hops = 1
path = self.path
while r.status in (301, 302):
hops += 1
if hops > 10:
self.send_error(408, 'Loop detected')
return
location = r.getheader('location')
ru2 = urlparse.urlparse(location)
c.close()
c = None
c, r = connect(ru2, ru)
ru = ru2
path = location
if r.status != 200:
ntemp.close()
ntemp = None
self.send_response(r.status)
for h in ('Content-Type', 'Content-Length', 'Content-Disposition'):
v = r.getheader(h)
if v:
self.send_header(h, v)
self.end_headers()
while True:
buf = r.read()
if not buf:
break
if ntemp is not None:
ntemp.write(buf)
self.wfile.write(buf)
self.wfile.flush()
if ntemp is not None:
ntemp.flush()
os.rename(ntemp.name, stash)
finally:
c.close()
except OSError as e:
if e.errno != 32: # Broken pipe
raise
finally:
if ntemp is not None:
try:
ntemp.close()
except OSError as e:
if e.errno != 2: # No such file or directory
raise
do_GET = go
do_HEAD = go
class ThreadedHTTPServer(SocketServer.ThreadingMixIn,
BaseHTTPServer.HTTPServer):
daemon_threads = True
hack_download_commands()
server = ThreadedHTTPServer(('127.0.0.1', 0), Handler)
sys.stdout.write('%d\n' % server.server_port)
sys.stdout.flush()
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment