|
#!/usr/bin/env python3 |
|
|
|
""" |
|
Serve the [TeX Live](https://www.tug.org/texlive/) ISO image in a HTTP server. Very suitable for offline and non-root environments, especially if you cannot `mount -o loop` the image. |
|
|
|
Requirements |
|
|
|
- curl, perl, python3, [pycdlib](https://clalancette.github.io/pycdlib/) |
|
|
|
Download the ISO image: |
|
|
|
```shell |
|
curl -O https://mirror.ctan.org/systems/texlive/Images/texlive2022.iso |
|
``` |
|
|
|
Download the network installer: |
|
|
|
```shell |
|
curl -O https://mirror.ctan.org/systems/texlive/tlnet/install-tl-unx.tar.gz |
|
``` |
|
|
|
Run the server: |
|
|
|
```shell |
|
python3 tl-iso-server.py |
|
``` |
|
|
|
Install TeX Live 2022: |
|
|
|
```shell |
|
perl install-tl -no-gui --no-interaction -repository http://localhost:8000/ -paper a4 -texdir /whatever/texlive/2022 |
|
``` |
|
|
|
""" |
|
|
|
|
|
__version__ = "0.0" |
|
|
|
import pycdlib |
|
from pycdlib.dr import DirectoryRecord |
|
from pycdlib.dates import DirectoryRecordDate |
|
|
|
import datetime |
|
import email.utils |
|
import html |
|
import http.client |
|
import io |
|
import mimetypes |
|
import posixpath |
|
import re |
|
import socket # For gethostbyaddr() |
|
import socketserver |
|
import sys |
|
import time |
|
import urllib.parse |
|
import shutil |
|
from http import HTTPStatus |
|
|
|
|
|
# Default error message template |
|
DEFAULT_ERROR_MESSAGE = """\ |
|
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" |
|
"http://www.w3.org/TR/html4/strict.dtd"> |
|
<html> |
|
<head> |
|
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"> |
|
<title>Error response</title> |
|
</head> |
|
<body> |
|
<h1>Error response</h1> |
|
<p>Error code: %(code)d</p> |
|
<p>Message: %(message)s.</p> |
|
<p>Error code explanation: %(code)s - %(explain)s.</p> |
|
</body> |
|
</html> |
|
""" |
|
|
|
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8" |
|
|
|
|
|
class HTTPServer(socketserver.TCPServer): |
|
|
|
allow_reuse_address = 1 # Seems to make sense in testing environment |
|
|
|
def server_bind(self): |
|
"""Override server_bind to store the server name.""" |
|
socketserver.TCPServer.server_bind(self) |
|
host, port = self.server_address[:2] |
|
self.server_name = socket.getfqdn(host) |
|
self.server_port = port |
|
|
|
|
|
class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer): |
|
daemon_threads = True |
|
|
|
|
|
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): |
|
|
|
# The Python system version, truncated to its first component. |
|
sys_version = "Python/" + sys.version.split()[0] |
|
|
|
# The server software version. You may want to override this. |
|
# The format is multiple whitespace-separated strings, |
|
# where each string is of the form name[/version]. |
|
server_version = "BaseHTTP/" + __version__ |
|
|
|
error_message_format = DEFAULT_ERROR_MESSAGE |
|
error_content_type = DEFAULT_ERROR_CONTENT_TYPE |
|
|
|
# The default request version. This only affects responses up until |
|
# the point where the request line is parsed, so it mainly decides what |
|
# the client gets back when sending a malformed request line. |
|
# Most web servers default to HTTP 0.9, i.e. don't send a status line. |
|
default_request_version = "HTTP/0.9" |
|
|
|
def parse_request(self): |
|
"""Parse a request (internal). |
|
|
|
The request should be stored in self.raw_requestline; the results |
|
are in self.command, self.path, self.request_version and |
|
self.headers. |
|
|
|
Return True for success, False for failure; on failure, any relevant |
|
error response has already been sent back. |
|
|
|
""" |
|
self.command = None # set in case of error on the first line |
|
self.request_version = version = self.default_request_version |
|
self.close_connection = True |
|
requestline = str(self.raw_requestline, "iso-8859-1") |
|
requestline = requestline.rstrip("\r\n") |
|
self.requestline = requestline |
|
words = requestline.split() |
|
if len(words) == 0: |
|
return False |
|
|
|
if len(words) >= 3: # Enough to determine protocol version |
|
version = words[-1] |
|
try: |
|
if not version.startswith("HTTP/"): |
|
raise ValueError |
|
base_version_number = version.split("/", 1)[1] |
|
version_number = base_version_number.split(".") |
|
# RFC 2145 section 3.1 says there can be only one "." and |
|
# - major and minor numbers MUST be treated as |
|
# separate integers; |
|
# - HTTP/2.4 is a lower version than HTTP/2.13, which in |
|
# turn is lower than HTTP/12.3; |
|
# - Leading zeros MUST be ignored by recipients. |
|
if len(version_number) != 2: |
|
raise ValueError |
|
version_number = int(version_number[0]), int(version_number[1]) |
|
except (ValueError, IndexError): |
|
self.send_error(HTTPStatus.BAD_REQUEST, "Bad request version (%r)" % version) |
|
return False |
|
if version_number >= (1, 1) and self.protocol_version >= "HTTP/1.1": |
|
self.close_connection = False |
|
if version_number >= (2, 0): |
|
self.send_error(HTTPStatus.HTTP_VERSION_NOT_SUPPORTED, "Invalid HTTP version (%s)" % base_version_number) |
|
return False |
|
self.request_version = version |
|
|
|
if not 2 <= len(words) <= 3: |
|
self.send_error(HTTPStatus.BAD_REQUEST, "Bad request syntax (%r)" % requestline) |
|
return False |
|
command, path = words[:2] |
|
if len(words) == 2: |
|
self.close_connection = True |
|
if command != "GET": |
|
self.send_error(HTTPStatus.BAD_REQUEST, "Bad HTTP/0.9 request type (%r)" % command) |
|
return False |
|
self.command, self.path = command, path |
|
|
|
# gh-87389: The purpose of replacing '//' with '/' is to protect |
|
# against open redirect attacks possibly triggered if the path starts |
|
# with '//' because http clients treat //path as an absolute URI |
|
# without scheme (similar to http://path) rather than a path. |
|
if self.path.startswith("//"): |
|
self.path = "/" + self.path.lstrip("/") # Reduce to a single / |
|
|
|
# Examine the headers and look for a Connection directive. |
|
try: |
|
self.headers = http.client.parse_headers(self.rfile, _class=self.MessageClass) |
|
except http.client.LineTooLong as err: |
|
self.send_error(HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE, "Line too long", str(err)) |
|
return False |
|
except http.client.HTTPException as err: |
|
self.send_error(HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE, "Too many headers", str(err)) |
|
return False |
|
|
|
conntype = self.headers.get("Connection", "") |
|
if conntype.lower() == "close": |
|
self.close_connection = True |
|
elif conntype.lower() == "keep-alive" and self.protocol_version >= "HTTP/1.1": |
|
self.close_connection = False |
|
# Examine the headers and look for an Expect directive |
|
expect = self.headers.get("Expect", "") |
|
if expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1": |
|
if not self.handle_expect_100(): |
|
return False |
|
return True |
|
|
|
def handle_expect_100(self): |
|
"""Decide what to do with an "Expect: 100-continue" header. |
|
|
|
If the client is expecting a 100 Continue response, we must |
|
respond with either a 100 Continue or a final response before |
|
waiting for the request body. The default is to always respond |
|
with a 100 Continue. You can behave differently (for example, |
|
reject unauthorized requests) by overriding this method. |
|
|
|
This method should either return True (possibly after sending |
|
a 100 Continue response) or send an error response and return |
|
False. |
|
|
|
""" |
|
self.send_response_only(HTTPStatus.CONTINUE) |
|
self.end_headers() |
|
return True |
|
|
|
def handle_one_request(self): |
|
"""Handle a single HTTP request. |
|
|
|
You normally don't need to override this method; see the class |
|
__doc__ string for information on how to handle specific HTTP |
|
commands such as GET and POST. |
|
|
|
""" |
|
try: |
|
self.raw_requestline = self.rfile.readline(65537) |
|
if len(self.raw_requestline) > 65536: |
|
self.requestline = "" |
|
self.request_version = "" |
|
self.command = "" |
|
self.send_error(HTTPStatus.REQUEST_URI_TOO_LONG) |
|
return |
|
if not self.raw_requestline: |
|
self.close_connection = True |
|
return |
|
if not self.parse_request(): |
|
# An error code has been sent, just exit |
|
return |
|
mname = "do_" + self.command |
|
if not hasattr(self, mname): |
|
self.send_error(HTTPStatus.NOT_IMPLEMENTED, "Unsupported method (%r)" % self.command) |
|
return |
|
method = getattr(self, mname) |
|
method() |
|
self.wfile.flush() # actually send the response if not already done. |
|
except TimeoutError as e: |
|
# a read or a write timed out. Discard this connection |
|
self.log_error("Request timed out: %r", e) |
|
self.close_connection = True |
|
return |
|
|
|
def handle(self): |
|
"""Handle multiple requests if necessary.""" |
|
self.close_connection = True |
|
|
|
self.handle_one_request() |
|
while not self.close_connection: |
|
self.handle_one_request() |
|
|
|
def send_error(self, code, message=None, explain=None): |
|
"""Send and log an error reply. |
|
|
|
Arguments are |
|
* code: an HTTP error code |
|
3 digits |
|
* message: a simple optional 1 line reason phrase. |
|
*( HTAB / SP / VCHAR / %x80-FF ) |
|
defaults to short entry matching the response code |
|
* explain: a detailed message defaults to the long entry |
|
matching the response code. |
|
|
|
This sends an error response (so it must be called before any |
|
output has been generated), logs the error, and finally sends |
|
a piece of HTML explaining the error to the user. |
|
|
|
""" |
|
|
|
try: |
|
shortmsg, longmsg = self.responses[code] |
|
except KeyError: |
|
shortmsg, longmsg = "???", "???" |
|
if message is None: |
|
message = shortmsg |
|
if explain is None: |
|
explain = longmsg |
|
self.log_error("code %d, message %s", code, message) |
|
self.send_response(code, message) |
|
self.send_header("Connection", "close") |
|
|
|
# Message body is omitted for cases described in: |
|
# - RFC7230: 3.3. 1xx, 204(No Content), 304(Not Modified) |
|
# - RFC7231: 6.3.6. 205(Reset Content) |
|
body = None |
|
if code >= 200 and code not in (HTTPStatus.NO_CONTENT, HTTPStatus.RESET_CONTENT, HTTPStatus.NOT_MODIFIED): |
|
# HTML encode to prevent Cross Site Scripting attacks |
|
# (see bug #1100201) |
|
content = self.error_message_format % { |
|
"code": code, |
|
"message": html.escape(message, quote=False), |
|
"explain": html.escape(explain, quote=False), |
|
} |
|
body = content.encode("UTF-8", "replace") |
|
self.send_header("Content-Type", self.error_content_type) |
|
self.send_header("Content-Length", str(len(body))) |
|
self.end_headers() |
|
|
|
if self.command != "HEAD" and body: |
|
self.wfile.write(body) |
|
|
|
def send_response(self, code, message=None): |
|
"""Add the response header to the headers buffer and log the |
|
response code. |
|
|
|
Also send two standard headers with the server software |
|
version and the current date. |
|
|
|
""" |
|
self.log_request(code) |
|
self.send_response_only(code, message) |
|
self.send_header("Server", self.version_string()) |
|
self.send_header("Date", self.date_time_string()) |
|
|
|
def send_response_only(self, code, message=None): |
|
"""Send the response header only.""" |
|
if self.request_version != "HTTP/0.9": |
|
if message is None: |
|
if code in self.responses: |
|
message = self.responses[code][0] |
|
else: |
|
message = "" |
|
if not hasattr(self, "_headers_buffer"): |
|
self._headers_buffer = [] |
|
self._headers_buffer.append(("%s %d %s\r\n" % (self.protocol_version, code, message)).encode("latin-1", "strict")) |
|
|
|
def send_header(self, keyword, value): |
|
"""Send a MIME header to the headers buffer.""" |
|
if self.request_version != "HTTP/0.9": |
|
if not hasattr(self, "_headers_buffer"): |
|
self._headers_buffer = [] |
|
self._headers_buffer.append(("%s: %s\r\n" % (keyword, value)).encode("latin-1", "strict")) |
|
|
|
if keyword.lower() == "connection": |
|
if value.lower() == "close": |
|
self.close_connection = True |
|
elif value.lower() == "keep-alive": |
|
self.close_connection = False |
|
|
|
def end_headers(self): |
|
"""Send the blank line ending the MIME headers.""" |
|
if self.request_version != "HTTP/0.9": |
|
self._headers_buffer.append(b"\r\n") |
|
self.flush_headers() |
|
|
|
def flush_headers(self): |
|
if hasattr(self, "_headers_buffer"): |
|
self.wfile.write(b"".join(self._headers_buffer)) |
|
self._headers_buffer = [] |
|
|
|
def log_request(self, code="-", size="-"): |
|
"""Log an accepted request. |
|
|
|
This is called by send_response(). |
|
|
|
""" |
|
if isinstance(code, HTTPStatus): |
|
code = code.value |
|
self.log_message('"%s" %s %s', self.requestline, str(code), str(size)) |
|
|
|
def log_error(self, format, *args): |
|
"""Log an error. |
|
|
|
This is called when a request cannot be fulfilled. By |
|
default it passes the message on to log_message(). |
|
|
|
Arguments are the same as for log_message(). |
|
|
|
XXX This should go to the separate error log. |
|
|
|
""" |
|
|
|
self.log_message(format, *args) |
|
|
|
def log_message(self, format, *args): |
|
"""Log an arbitrary message. |
|
|
|
This is used by all other logging functions. Override |
|
it if you have specific logging wishes. |
|
|
|
The first argument, FORMAT, is a format string for the |
|
message to be logged. If the format string contains |
|
any % escapes requiring parameters, they should be |
|
specified as subsequent arguments (it's just like |
|
printf!). |
|
|
|
The client ip and current date/time are prefixed to |
|
every message. |
|
|
|
""" |
|
|
|
sys.stderr.write("%s - - [%s] %s\n" % (self.address_string(), self.log_date_time_string(), format % args)) |
|
|
|
def version_string(self): |
|
"""Return the server software version string.""" |
|
return self.server_version + " " + self.sys_version |
|
|
|
def date_time_string(self, timestamp: DirectoryRecordDate = None): |
|
"""Return the current date and time formatted for a message header.""" |
|
if timestamp is None: |
|
timestamp = time.time() |
|
|
|
return email.utils.formatdate(timestamp, usegmt=True) |
|
|
|
def log_date_time_string(self): |
|
"""Return the current time formatted for logging.""" |
|
now = time.time() |
|
year, month, day, hh, mm, ss, x, y, z = time.localtime(now) |
|
s = "%02d/%3s/%04d %02d:%02d:%02d" % (day, self.monthname[month], year, hh, mm, ss) |
|
return s |
|
|
|
weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] |
|
|
|
monthname = [None, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] |
|
|
|
def address_string(self): |
|
"""Return the client address.""" |
|
|
|
return self.client_address[0] |
|
|
|
# Essentially static class variables |
|
|
|
# The version of the HTTP protocol we support. |
|
# Set this to HTTP/1.1 to enable automatic keepalive |
|
protocol_version = "HTTP/1.0" |
|
|
|
# MessageClass used to parse headers |
|
MessageClass = http.client.HTTPMessage |
|
|
|
# hack to maintain backwards compatibility |
|
responses = {v: (v.phrase, v.description) for v in HTTPStatus.__members__.values()} |
|
|
|
|
|
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): |
|
|
|
"""Simple HTTP request handler with GET and HEAD commands. |
|
|
|
This serves files from the current directory and any of its |
|
subdirectories. The MIME type for files is determined by |
|
calling the .guess_type() method. |
|
|
|
The GET and HEAD requests are identical except that the HEAD |
|
request omits the actual contents of the file. |
|
|
|
""" |
|
|
|
server_version = "SimpleHTTP/" + __version__ |
|
extensions_map = _encodings_map_default = { |
|
".gz": "application/gzip", |
|
".Z": "application/octet-stream", |
|
".bz2": "application/x-bzip2", |
|
".xz": "application/x-xz", |
|
} |
|
|
|
def __init__(self, *args, iso=None, archive_r={}, **kwargs): |
|
if iso is None: |
|
raise RuntimeError |
|
self.iso = iso |
|
self.archive_r = archive_r |
|
super().__init__(*args, **kwargs) |
|
|
|
def do_GET(self): |
|
"""Serve a GET request.""" |
|
f = self.send_head() |
|
if f: |
|
try: |
|
if isinstance(f, str): |
|
self.iso.get_file_from_iso_fp(self.wfile, joliet_path=f) |
|
else: |
|
self.copyfile(f, self.wfile) |
|
finally: |
|
pass # f.close() |
|
|
|
def do_HEAD(self): |
|
"""Serve a HEAD request.""" |
|
f = self.send_head() |
|
if f: |
|
f.close() |
|
|
|
def send_head(self): |
|
"""Common code for GET and HEAD commands. |
|
|
|
This sends the response code and MIME headers. |
|
|
|
Return value is either a file object (which has to be copied |
|
to the outputfile by the caller unless the command was HEAD, |
|
and must be closed by the caller under all circumstances), or |
|
None, in which case the caller has nothing further to do. |
|
|
|
""" |
|
path = self.translate_path(self.path) |
|
|
|
if posixpath.dirname(path) == "/archive": |
|
path = "/archive/" + archive_r[posixpath.basename(path)] |
|
|
|
try: |
|
f: DirectoryRecord = self.iso.get_record(joliet_path=path) |
|
except pycdlib.pycdlibexception.PyCdlibInvalidInput: |
|
self.send_error(HTTPStatus.NOT_FOUND, "File not found") |
|
return None |
|
|
|
if f.is_dir(): |
|
return self.list_directory(path) |
|
|
|
ctype = self.guess_type(path) |
|
# check for trailing "/" which should return 404. See Issue17324 |
|
# The test for this was added in test_httpserver.py |
|
# However, some OS platforms accept a trailingSlash as a filename |
|
# See discussion on python-dev and Issue34711 regarding |
|
# parseing and rejection of filenames with a trailing slash |
|
if self.path.endswith("/"): |
|
self.send_error(HTTPStatus.NOT_FOUND, "File not found") |
|
return None |
|
|
|
timestamp = datetime.datetime( |
|
f.date.years_since_1900 + 1900, f.date.month, f.date.day_of_month, f.date.hour, f.date.minute, f.date.second |
|
).timestamp() |
|
|
|
# Use browser cache if possible |
|
if "If-Modified-Since" in self.headers and "If-None-Match" not in self.headers: |
|
# compare If-Modified-Since and time of last file modification |
|
try: |
|
ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"]) |
|
except (TypeError, IndexError, OverflowError, ValueError): |
|
# ignore ill-formed values |
|
pass |
|
else: |
|
if ims.tzinfo is None: |
|
# obsolete format with no timezone, cf. |
|
# https://tools.ietf.org/html/rfc7231#section-7.1.1.1 |
|
ims = ims.replace(tzinfo=datetime.timezone.utc) |
|
if ims.tzinfo is datetime.timezone.utc: |
|
# compare to UTC datetime of last modification |
|
last_modif = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc) |
|
# remove microseconds, like in If-Modified-Since |
|
last_modif = last_modif.replace(microsecond=0) |
|
if last_modif <= ims: |
|
self.send_response(HTTPStatus.NOT_MODIFIED) |
|
self.end_headers() |
|
return None |
|
|
|
self.send_response(HTTPStatus.OK) |
|
self.send_header("Content-type", ctype) |
|
self.send_header("Content-Length", f.inode.get_data_length()) |
|
self.send_header("Last-Modified", self.date_time_string(timestamp)) |
|
self.end_headers() |
|
|
|
return self.iso.open_file_from_iso(joliet_path=path) |
|
|
|
def list_directory(self, path): |
|
"""Helper to produce a directory listing (absent index.html). |
|
|
|
Return value is either a file object, or None (indicating an |
|
error). In either case, the headers are sent, making the |
|
interface the same as for send_head(). |
|
|
|
""" |
|
|
|
try: |
|
list = [i.file_identifier().decode("utf-16_be") for i in self.iso.list_children(joliet_path=path)] |
|
except OSError: |
|
self.send_error(HTTPStatus.NOT_FOUND, "No permission to list directory") |
|
return None |
|
|
|
list.sort(key=lambda a: a.lower()) |
|
r = [] |
|
try: |
|
displaypath = urllib.parse.unquote(self.path, errors="surrogatepass") |
|
except UnicodeDecodeError: |
|
displaypath = urllib.parse.unquote(path) |
|
displaypath = html.escape(displaypath, quote=False) |
|
enc = sys.getfilesystemencoding() |
|
title = "Directory listing for %s" % displaypath |
|
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ' '"http://www.w3.org/TR/html4/strict.dtd">') |
|
r.append("<html>\n<head>") |
|
r.append('<meta http-equiv="Content-Type" ' 'content="text/html; charset=%s">' % enc) |
|
r.append("<title>%s</title>\n</head>" % title) |
|
r.append("<body>\n<h1>%s</h1>" % title) |
|
r.append("<hr>\n<ul>") |
|
for name in list: |
|
if name == ".": |
|
continue |
|
if name == ".." and path == "/": |
|
continue |
|
fullname = path.rstrip("/") + "/" + name |
|
displayname = linkname = name |
|
|
|
# Append / for directories or @ for symbolic links |
|
if self.iso.get_record(rr_path=fullname).is_dir(): |
|
displayname = name + "/" |
|
linkname = name + "/" |
|
|
|
r.append( |
|
'<li><a href="%s">%s</a></li>' |
|
% (urllib.parse.quote(linkname, errors="surrogatepass"), html.escape(displayname, quote=False)) |
|
) |
|
r.append("</ul>\n<hr>\n</body>\n</html>\n") |
|
encoded = "\n".join(r).encode(enc, "surrogateescape") |
|
f = io.BytesIO() |
|
f.write(encoded) |
|
f.seek(0) |
|
self.send_response(HTTPStatus.OK) |
|
self.send_header("Content-type", "text/html; charset=%s" % enc) |
|
self.send_header("Content-Length", str(len(encoded))) |
|
self.end_headers() |
|
return f |
|
|
|
def translate_path(self, path): |
|
"""Translate a /-separated PATH to the local filename syntax. |
|
|
|
Components that mean special things to the local file system |
|
(e.g. drive or directory names) are ignored. (XXX They should |
|
probably be diagnosed.) |
|
|
|
""" |
|
# abandon query parameters |
|
path = path.split("?", 1)[0] |
|
path = path.split("#", 1)[0] |
|
# Don't forget explicit trailing slash when normalizing. Issue17324 |
|
trailing_slash = path.rstrip().endswith("/") |
|
try: |
|
path = urllib.parse.unquote(path, errors="surrogatepass") |
|
except UnicodeDecodeError: |
|
path = urllib.parse.unquote(path) |
|
path = posixpath.normpath(path) |
|
|
|
return path |
|
|
|
def copyfile(self, source, outputfile): |
|
"""Copy all data between two file objects. |
|
|
|
The SOURCE argument is a file object open for reading |
|
(or anything with a read() method) and the DESTINATION |
|
argument is a file object open for writing (or |
|
anything with a write() method). |
|
|
|
The only reason for overriding this would be to change |
|
the block size or perhaps to replace newlines by CRLF |
|
-- note however that this the default server uses this |
|
to copy binary data as well. |
|
|
|
""" |
|
with source as source_fp: |
|
shutil.copyfileobj(source_fp, outputfile) |
|
pass |
|
|
|
def guess_type(self, path): |
|
"""Guess the type of a file. |
|
|
|
Argument is a PATH (a filename). |
|
|
|
Return value is a string of the form type/subtype, |
|
usable for a MIME Content-type header. |
|
|
|
The default implementation looks the file's extension |
|
up in the table self.extensions_map, using application/octet-stream |
|
as a default; however it would be permissible (if |
|
slow) to look inside the data to make a better guess. |
|
|
|
""" |
|
base, ext = posixpath.splitext(path) |
|
if ext in self.extensions_map: |
|
return self.extensions_map[ext] |
|
ext = ext.lower() |
|
if ext in self.extensions_map: |
|
return self.extensions_map[ext] |
|
guess, _ = mimetypes.guess_type(path) |
|
if guess: |
|
return guess |
|
return "application/octet-stream" |
|
|
|
|
|
def _get_best_family(*address): |
|
infos = socket.getaddrinfo( |
|
*address, |
|
type=socket.SOCK_STREAM, |
|
flags=socket.AI_PASSIVE, |
|
) |
|
family, type, proto, canonname, sockaddr = next(iter(infos)) |
|
return family, sockaddr |
|
|
|
|
|
def serve(HandlerClass=BaseHTTPRequestHandler, ServerClass=ThreadingHTTPServer, protocol="HTTP/1.0", port=8000, bind=None): |
|
"""Test the HTTP request handler class. |
|
|
|
This runs an HTTP server on port 8000 (or the port argument). |
|
|
|
""" |
|
ServerClass.address_family, addr = _get_best_family(bind, port) |
|
HandlerClass.protocol_version = protocol |
|
with ServerClass(addr, HandlerClass) as httpd: |
|
host, port = httpd.socket.getsockname()[:2] |
|
url_host = f"[{host}]" if ":" in host else host |
|
print(f"Serving HTTP on {host} port {port} " f"(http://{url_host}:{port}/) ...") |
|
try: |
|
httpd.serve_forever() |
|
except KeyboardInterrupt: |
|
print("\nKeyboard interrupt received, exiting.") |
|
sys.exit(0) |
|
|
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
import contextlib |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-i", "--iso", dest="filename", help="ISO file to open", default="texlive2022.iso") |
|
parser.add_argument("--bind", "-b", metavar="ADDRESS", help="specify alternate bind address " "(default: all interfaces)") |
|
parser.add_argument("port", action="store", default=8000, type=int, nargs="?", help="specify alternate port (default: 8000)") |
|
args = parser.parse_args() |
|
|
|
iso = pycdlib.PyCdlib() |
|
iso.open(args.filename) |
|
|
|
# the archives in the /archive subdirectory are versioned with a revision number |
|
# aka. /archive/hyphen-base.r62751.tar.xz ⇔ /archive/hyphen-base.tar.xz |
|
archive_r = {} |
|
for r in iso.list_children(joliet_path="/archive"): |
|
r = r.file_identifier().decode("utf-16_be") |
|
a = re.sub(r"(\.r\d+)\.tar\.xz", ".tar.xz", r) |
|
archive_r[a] = r |
|
|
|
# ensure dual-stack is not disabled; ref #38907 |
|
class DualStackServer(ThreadingHTTPServer): |
|
def server_bind(self): |
|
# suppress exception when protocol is IPv4 |
|
with contextlib.suppress(Exception): |
|
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) |
|
return super().server_bind() |
|
|
|
def finish_request(self, request, client_address): |
|
self.RequestHandlerClass(request, client_address, self, iso=iso, archive_r=archive_r) |
|
|
|
serve( |
|
HandlerClass=SimpleHTTPRequestHandler, |
|
ServerClass=DualStackServer, |
|
port=args.port, |
|
bind=args.bind, |
|
) |