Created
September 15, 2020 22:35
-
-
Save pdxjohnny/e761949872bd0482d02216531558b1c2 to your computer and use it in GitHub Desktop.
Python HTTP server for zip file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Much of this code is copied from cpython Lib/http/server.py | |
It has been adapted to make a subclass of SimpleHTTPRequestHandler that serves | |
files from a zipfile rather than a directory. | |
Keeping the licensing the same for simplicity | |
LICENSE: https://github.com/python/cpython/raw/3.8/LICENSE | |
Usage: | |
$ python zip_http_server.py myfile.zip --addr 0.0.0.0 | |
""" | |
import io | |
import sys | |
import html | |
import email | |
import urllib | |
import zipfile | |
import os.path | |
import argparse | |
import datetime | |
import posixpath | |
import http.server | |
import socketserver | |
from http import HTTPStatus | |
def main(): | |
parser = argparse.ArgumentParser(description="Process some integers.") | |
parser.add_argument("zipfile", help="Zip filename to serve from") | |
parser.add_argument("--addr", default="127.0.0.1", help="Address to bind to") | |
parser.add_argument("--port", type=int, default=8000, help="Port to use") | |
args = parser.parse_args() | |
# Avoid adderess in use error | |
socketserver.TCPServer.allow_reuse_address = True | |
with zipfile.ZipFile(args.zipfile, "r") as myzip: | |
infos = myzip.infolist() | |
infos = {info.filename: info for info in myzip.infolist()} | |
top = {} | |
by_dir = {} | |
for info in infos.values(): | |
path = posixpath.normpath(info.filename) | |
words = tuple(path.split("/"))[:-1] | |
if words: | |
by_dir.setdefault(words, {}) | |
by_dir[words][info.filename] = info | |
else: | |
obj = info.filename | |
if obj.endswith("/"): | |
obj = obj[:-1] | |
top[obj] = info | |
class Handler(http.server.SimpleHTTPRequestHandler): | |
def path_join(self, *args): | |
return os.path.join(*args) | |
def path_isdir(self, obj): | |
if not obj: | |
return True | |
for addition in ["", "/"]: | |
check = obj + addition | |
if check in infos and infos[check].is_dir(): | |
return True | |
return False | |
def path_exists(self, obj): | |
return obj in infos | |
def path_islink(self, obj): | |
return False | |
def listdir(self, obj): | |
if not obj: | |
return list(top.keys()) | |
path = posixpath.normpath(obj) | |
words = tuple(path.split("/")) | |
def remove_leading(filename): | |
path = posixpath.normpath(filename) | |
parts = tuple(path.split("/")) | |
return "/".join(parts[len(words) :]) | |
return list(map(remove_leading, by_dir[words].keys())) | |
def do_GET(self): | |
"""Serve a GET request.""" | |
f = self.send_head() | |
if f: | |
try: | |
self.copyfile(f, self.wfile) | |
finally: | |
f.close() | |
def do_HEAD(self): | |
"""Serve a HEAD request.""" | |
f = self.send_head() | |
if f: | |
f.close() | |
def send_head(self): | |
"""Common code for GET and HEAD commands. | |
This sends the response code and MIME headers. | |
Return value is either a file object (which has to be copied | |
to the outputfile by the caller unless the command was HEAD, | |
and must be closed by the caller under all circumstances), or | |
None, in which case the caller has nothing further to do. | |
""" | |
path = self.translate_path(self.path) | |
f = None | |
if self.path_isdir(path): | |
parts = urllib.parse.urlsplit(self.path) | |
if not parts.path.endswith("/"): | |
# redirect browser - doing basically what apache does | |
self.send_response(HTTPStatus.MOVED_PERMANENTLY) | |
new_parts = ( | |
parts[0], | |
parts[1], | |
parts[2] + "/", | |
parts[3], | |
parts[4], | |
) | |
new_url = urllib.parse.urlunsplit(new_parts) | |
self.send_header("Location", new_url) | |
self.end_headers() | |
return None | |
for index in "index.html", "index.htm": | |
index = self.path_join(path, index) | |
if self.path_exists(index): | |
path = index | |
break | |
else: | |
return self.list_directory(path) | |
ctype = self.guess_type(path) | |
# check for trailing "/" which should return 404. See Issue17324 | |
# The test for this was added in test_httpserver.py | |
# However, some OS platforms accept a trailingSlash as a filename | |
# See discussion on python-dev and Issue34711 regarding | |
# parseing and rejection of filenames with a trailing slash | |
if path.endswith("/"): | |
self.send_error(HTTPStatus.NOT_FOUND, "File not found") | |
return None | |
try: | |
try: | |
info = myzip.getinfo(path) | |
except KeyError: | |
self.send_error(HTTPStatus.NOT_FOUND, "File not found") | |
return None | |
f = myzip.open(path, force_zip64=True) | |
except OSError: | |
self.send_error(HTTPStatus.NOT_FOUND, "File not found") | |
return None | |
last_modif = datetime.datetime( | |
*info.date_time, tzinfo=datetime.timezone.utc | |
) | |
try: | |
# Use browser cache if possible | |
if ( | |
"If-Modified-Since" in self.headers | |
and "If-None-Match" not in self.headers | |
): | |
# compare If-Modified-Since and time of last file modification | |
try: | |
ims = email.utils.parsedate_to_datetime( | |
self.headers["If-Modified-Since"] | |
) | |
except (TypeError, IndexError, OverflowError, ValueError): | |
# ignore ill-formed values | |
pass | |
else: | |
if ims.tzinfo is None: | |
# obsolete format with no timezone, cf. | |
# https://tools.ietf.org/html/rfc7231#section-7.1.1.1 | |
ims = ims.replace(tzinfo=datetime.timezone.utc) | |
if ims.tzinfo is datetime.timezone.utc: | |
# compare to UTC datetime of last modification | |
# remove microseconds, like in If-Modified-Since | |
last_modif = last_modif.replace(microsecond=0) | |
if last_modif <= ims: | |
self.send_response(HTTPStatus.NOT_MODIFIED) | |
self.end_headers() | |
f.close() | |
return None | |
self.send_response(HTTPStatus.OK) | |
self.send_header("Content-type", ctype) | |
self.send_header("Content-Length", str(info.file_size)) | |
self.send_header( | |
"Last-Modified", self.date_time_string(last_modif.timestamp()) | |
) | |
self.end_headers() | |
return f | |
except: | |
f.close() | |
raise | |
def list_directory(self, path): | |
"""Helper to produce a directory listing (absent index.html). | |
Return value is either a file object, or None (indicating an | |
error). In either case, the headers are sent, making the | |
interface the same as for send_head(). | |
""" | |
try: | |
list = self.listdir(path) | |
except OSError: | |
self.send_error( | |
HTTPStatus.NOT_FOUND, "No permission to list directory" | |
) | |
return None | |
list.sort(key=lambda a: a.lower()) | |
r = [] | |
try: | |
displaypath = urllib.parse.unquote( | |
self.path, errors="surrogatepass" | |
) | |
except UnicodeDecodeError: | |
displaypath = urllib.parse.unquote(path) | |
displaypath = html.escape(displaypath, quote=False) | |
enc = sys.getfilesystemencoding() | |
title = "Directory listing for %s" % displaypath | |
r.append( | |
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ' | |
'"http://www.w3.org/TR/html4/strict.dtd">' | |
) | |
r.append("<html>\n<head>") | |
r.append( | |
'<meta http-equiv="Content-Type" ' | |
'content="text/html; charset=%s">' % enc | |
) | |
r.append("<title>%s</title>\n</head>" % title) | |
r.append("<body>\n<h1>%s</h1>" % title) | |
r.append("<hr>\n<ul>") | |
for name in list: | |
fullname = self.path_join(path, name) | |
displayname = linkname = name | |
# Append / for directories or @ for symbolic links | |
if self.path_isdir(fullname): | |
displayname = name + "/" | |
linkname = name + "/" | |
if self.path_islink(fullname): | |
displayname = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
r.append( | |
'<li><a href="%s">%s</a></li>' | |
% ( | |
urllib.parse.quote(linkname, errors="surrogatepass"), | |
html.escape(displayname, quote=False), | |
) | |
) | |
r.append("</ul>\n<hr>\n</body>\n</html>\n") | |
encoded = "\n".join(r).encode(enc, "surrogateescape") | |
f = io.BytesIO() | |
f.write(encoded) | |
f.seek(0) | |
self.send_response(HTTPStatus.OK) | |
self.send_header("Content-type", "text/html; charset=%s" % enc) | |
self.send_header("Content-Length", str(len(encoded))) | |
self.end_headers() | |
return f | |
def translate_path(self, path): | |
"""Translate a /-separated PATH to the local filename syntax. | |
Components that mean special things to the local file system | |
(e.g. drive or directory names) are ignored. (XXX They should | |
probably be diagnosed.) | |
""" | |
# abandon query parameters | |
path = path.split("?", 1)[0] | |
path = path.split("#", 1)[0] | |
# Don't forget explicit trailing slash when normalizing. Issue17324 | |
trailing_slash = path.rstrip().endswith("/") | |
try: | |
path = urllib.parse.unquote(path, errors="surrogatepass") | |
except UnicodeDecodeError: | |
path = urllib.parse.unquote(path) | |
path = posixpath.normpath(path) | |
words = path.split("/") | |
words = filter(None, words) | |
# path = self.directory | |
path = "." | |
for word in words: | |
if os.path.dirname(word) or word in (os.curdir, os.pardir): | |
# Ignore components that are not a simple file/directory name | |
continue | |
path = self.path_join(path, word) | |
if trailing_slash: | |
path += "/" | |
path = path[2:] | |
return path | |
with socketserver.TCPServer((args.addr, args.port), Handler) as httpd: | |
print("Listening on http://%s:%d/" % httpd.socket.getsockname()) | |
httpd.serve_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment