Skip to content

Instantly share code, notes, and snippets.

@rene-d
Last active September 16, 2022 04:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rene-d/7762e854aa66649a11860b47ff0701a0 to your computer and use it in GitHub Desktop.
Save rene-d/7762e854aa66649a11860b47ff0701a0 to your computer and use it in GitHub Desktop.
Serve an ISO image in a HTTP server.

tl-iso-server.py

Serve an ISO image in a HTTP server. Designed for use with TeX Live, but should work for any ISO image.

Very suitable for offline and non-root environments, especially if you cannot mount -o loop the image.

Requirements

Download the ISO image

curl -O https://mirror.ctan.org/systems/texlive/Images/texlive2022.iso

Download the network installer

curl -O https://mirror.ctan.org/systems/texlive/tlnet/install-tl-unx.tar.gz

Run the server

python3 tl-iso-server.py

Install TeX Live 2022

perl install-tl -no-gui --no-interaction -repository http://localhost:8000/ -paper a4 -texdir /whatever/texlive/2022

Nota

The installer that comes with the 2022 distro does not have the -texdir option. However you can extract it:

wget -np -m --no-host-directories http://localhost:8000/install-tl http://localhost:8000/tlpkg/TeXLive/ http://localhost:8000/tlpkg/installer/
#!/usr/bin/env python3
"""
Serve the [TeX Live](https://www.tug.org/texlive/) ISO image in a HTTP server. Very suitable for offline and non-root environments, especially if you cannot `mount -o loop` the image.
Requirements
- curl, perl, python3, [pycdlib](https://clalancette.github.io/pycdlib/)
Download the ISO image:
```shell
curl -O https://mirror.ctan.org/systems/texlive/Images/texlive2022.iso
```
Download the network installer:
```shell
curl -O https://mirror.ctan.org/systems/texlive/tlnet/install-tl-unx.tar.gz
```
Run the server:
```shell
python3 tl-iso-server.py
```
Install TeX Live 2022:
```shell
perl install-tl -no-gui --no-interaction -repository http://localhost:8000/ -paper a4 -texdir /whatever/texlive/2022
```
"""
__version__ = "0.0"
import pycdlib
from pycdlib.dr import DirectoryRecord
from pycdlib.dates import DirectoryRecordDate
import datetime
import email.utils
import html
import http.client
import io
import mimetypes
import posixpath
import re
import socket # For gethostbyaddr()
import socketserver
import sys
import time
import urllib.parse
import shutil
from http import HTTPStatus
# Default error message template
DEFAULT_ERROR_MESSAGE = """\
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8">
<title>Error response</title>
</head>
<body>
<h1>Error response</h1>
<p>Error code: %(code)d</p>
<p>Message: %(message)s.</p>
<p>Error code explanation: %(code)s - %(explain)s.</p>
</body>
</html>
"""
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = 1 # Seems to make sense in testing environment
def server_bind(self):
"""Override server_bind to store the server name."""
socketserver.TCPServer.server_bind(self)
host, port = self.server_address[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
daemon_threads = True
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
# The Python system version, truncated to its first component.
sys_version = "Python/" + sys.version.split()[0]
# The server software version. You may want to override this.
# The format is multiple whitespace-separated strings,
# where each string is of the form name[/version].
server_version = "BaseHTTP/" + __version__
error_message_format = DEFAULT_ERROR_MESSAGE
error_content_type = DEFAULT_ERROR_CONTENT_TYPE
# The default request version. This only affects responses up until
# the point where the request line is parsed, so it mainly decides what
# the client gets back when sending a malformed request line.
# Most web servers default to HTTP 0.9, i.e. don't send a status line.
default_request_version = "HTTP/0.9"
def parse_request(self):
"""Parse a request (internal).
The request should be stored in self.raw_requestline; the results
are in self.command, self.path, self.request_version and
self.headers.
Return True for success, False for failure; on failure, any relevant
error response has already been sent back.
"""
self.command = None # set in case of error on the first line
self.request_version = version = self.default_request_version
self.close_connection = True
requestline = str(self.raw_requestline, "iso-8859-1")
requestline = requestline.rstrip("\r\n")
self.requestline = requestline
words = requestline.split()
if len(words) == 0:
return False
if len(words) >= 3: # Enough to determine protocol version
version = words[-1]
try:
if not version.startswith("HTTP/"):
raise ValueError
base_version_number = version.split("/", 1)[1]
version_number = base_version_number.split(".")
# RFC 2145 section 3.1 says there can be only one "." and
# - major and minor numbers MUST be treated as
# separate integers;
# - HTTP/2.4 is a lower version than HTTP/2.13, which in
# turn is lower than HTTP/12.3;
# - Leading zeros MUST be ignored by recipients.
if len(version_number) != 2:
raise ValueError
version_number = int(version_number[0]), int(version_number[1])
except (ValueError, IndexError):
self.send_error(HTTPStatus.BAD_REQUEST, "Bad request version (%r)" % version)
return False
if version_number >= (1, 1) and self.protocol_version >= "HTTP/1.1":
self.close_connection = False
if version_number >= (2, 0):
self.send_error(HTTPStatus.HTTP_VERSION_NOT_SUPPORTED, "Invalid HTTP version (%s)" % base_version_number)
return False
self.request_version = version
if not 2 <= len(words) <= 3:
self.send_error(HTTPStatus.BAD_REQUEST, "Bad request syntax (%r)" % requestline)
return False
command, path = words[:2]
if len(words) == 2:
self.close_connection = True
if command != "GET":
self.send_error(HTTPStatus.BAD_REQUEST, "Bad HTTP/0.9 request type (%r)" % command)
return False
self.command, self.path = command, path
# gh-87389: The purpose of replacing '//' with '/' is to protect
# against open redirect attacks possibly triggered if the path starts
# with '//' because http clients treat //path as an absolute URI
# without scheme (similar to http://path) rather than a path.
if self.path.startswith("//"):
self.path = "/" + self.path.lstrip("/") # Reduce to a single /
# Examine the headers and look for a Connection directive.
try:
self.headers = http.client.parse_headers(self.rfile, _class=self.MessageClass)
except http.client.LineTooLong as err:
self.send_error(HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE, "Line too long", str(err))
return False
except http.client.HTTPException as err:
self.send_error(HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE, "Too many headers", str(err))
return False
conntype = self.headers.get("Connection", "")
if conntype.lower() == "close":
self.close_connection = True
elif conntype.lower() == "keep-alive" and self.protocol_version >= "HTTP/1.1":
self.close_connection = False
# Examine the headers and look for an Expect directive
expect = self.headers.get("Expect", "")
if expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1":
if not self.handle_expect_100():
return False
return True
def handle_expect_100(self):
"""Decide what to do with an "Expect: 100-continue" header.
If the client is expecting a 100 Continue response, we must
respond with either a 100 Continue or a final response before
waiting for the request body. The default is to always respond
with a 100 Continue. You can behave differently (for example,
reject unauthorized requests) by overriding this method.
This method should either return True (possibly after sending
a 100 Continue response) or send an error response and return
False.
"""
self.send_response_only(HTTPStatus.CONTINUE)
self.end_headers()
return True
def handle_one_request(self):
"""Handle a single HTTP request.
You normally don't need to override this method; see the class
__doc__ string for information on how to handle specific HTTP
commands such as GET and POST.
"""
try:
self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
self.requestline = ""
self.request_version = ""
self.command = ""
self.send_error(HTTPStatus.REQUEST_URI_TOO_LONG)
return
if not self.raw_requestline:
self.close_connection = True
return
if not self.parse_request():
# An error code has been sent, just exit
return
mname = "do_" + self.command
if not hasattr(self, mname):
self.send_error(HTTPStatus.NOT_IMPLEMENTED, "Unsupported method (%r)" % self.command)
return
method = getattr(self, mname)
method()
self.wfile.flush() # actually send the response if not already done.
except TimeoutError as e:
# a read or a write timed out. Discard this connection
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
def handle(self):
"""Handle multiple requests if necessary."""
self.close_connection = True
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
def send_error(self, code, message=None, explain=None):
"""Send and log an error reply.
Arguments are
* code: an HTTP error code
3 digits
* message: a simple optional 1 line reason phrase.
*( HTAB / SP / VCHAR / %x80-FF )
defaults to short entry matching the response code
* explain: a detailed message defaults to the long entry
matching the response code.
This sends an error response (so it must be called before any
output has been generated), logs the error, and finally sends
a piece of HTML explaining the error to the user.
"""
try:
shortmsg, longmsg = self.responses[code]
except KeyError:
shortmsg, longmsg = "???", "???"
if message is None:
message = shortmsg
if explain is None:
explain = longmsg
self.log_error("code %d, message %s", code, message)
self.send_response(code, message)
self.send_header("Connection", "close")
# Message body is omitted for cases described in:
# - RFC7230: 3.3. 1xx, 204(No Content), 304(Not Modified)
# - RFC7231: 6.3.6. 205(Reset Content)
body = None
if code >= 200 and code not in (HTTPStatus.NO_CONTENT, HTTPStatus.RESET_CONTENT, HTTPStatus.NOT_MODIFIED):
# HTML encode to prevent Cross Site Scripting attacks
# (see bug #1100201)
content = self.error_message_format % {
"code": code,
"message": html.escape(message, quote=False),
"explain": html.escape(explain, quote=False),
}
body = content.encode("UTF-8", "replace")
self.send_header("Content-Type", self.error_content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
if self.command != "HEAD" and body:
self.wfile.write(body)
def send_response(self, code, message=None):
"""Add the response header to the headers buffer and log the
response code.
Also send two standard headers with the server software
version and the current date.
"""
self.log_request(code)
self.send_response_only(code, message)
self.send_header("Server", self.version_string())
self.send_header("Date", self.date_time_string())
def send_response_only(self, code, message=None):
"""Send the response header only."""
if self.request_version != "HTTP/0.9":
if message is None:
if code in self.responses:
message = self.responses[code][0]
else:
message = ""
if not hasattr(self, "_headers_buffer"):
self._headers_buffer = []
self._headers_buffer.append(("%s %d %s\r\n" % (self.protocol_version, code, message)).encode("latin-1", "strict"))
def send_header(self, keyword, value):
"""Send a MIME header to the headers buffer."""
if self.request_version != "HTTP/0.9":
if not hasattr(self, "_headers_buffer"):
self._headers_buffer = []
self._headers_buffer.append(("%s: %s\r\n" % (keyword, value)).encode("latin-1", "strict"))
if keyword.lower() == "connection":
if value.lower() == "close":
self.close_connection = True
elif value.lower() == "keep-alive":
self.close_connection = False
def end_headers(self):
"""Send the blank line ending the MIME headers."""
if self.request_version != "HTTP/0.9":
self._headers_buffer.append(b"\r\n")
self.flush_headers()
def flush_headers(self):
if hasattr(self, "_headers_buffer"):
self.wfile.write(b"".join(self._headers_buffer))
self._headers_buffer = []
def log_request(self, code="-", size="-"):
"""Log an accepted request.
This is called by send_response().
"""
if isinstance(code, HTTPStatus):
code = code.value
self.log_message('"%s" %s %s', self.requestline, str(code), str(size))
def log_error(self, format, *args):
"""Log an error.
This is called when a request cannot be fulfilled. By
default it passes the message on to log_message().
Arguments are the same as for log_message().
XXX This should go to the separate error log.
"""
self.log_message(format, *args)
def log_message(self, format, *args):
"""Log an arbitrary message.
This is used by all other logging functions. Override
it if you have specific logging wishes.
The first argument, FORMAT, is a format string for the
message to be logged. If the format string contains
any % escapes requiring parameters, they should be
specified as subsequent arguments (it's just like
printf!).
The client ip and current date/time are prefixed to
every message.
"""
sys.stderr.write("%s - - [%s] %s\n" % (self.address_string(), self.log_date_time_string(), format % args))
def version_string(self):
"""Return the server software version string."""
return self.server_version + " " + self.sys_version
def date_time_string(self, timestamp: DirectoryRecordDate = None):
"""Return the current date and time formatted for a message header."""
if timestamp is None:
timestamp = time.time()
return email.utils.formatdate(timestamp, usegmt=True)
def log_date_time_string(self):
"""Return the current time formatted for logging."""
now = time.time()
year, month, day, hh, mm, ss, x, y, z = time.localtime(now)
s = "%02d/%3s/%04d %02d:%02d:%02d" % (day, self.monthname[month], year, hh, mm, ss)
return s
weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
monthname = [None, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
def address_string(self):
"""Return the client address."""
return self.client_address[0]
# Essentially static class variables
# The version of the HTTP protocol we support.
# Set this to HTTP/1.1 to enable automatic keepalive
protocol_version = "HTTP/1.0"
# MessageClass used to parse headers
MessageClass = http.client.HTTPMessage
# hack to maintain backwards compatibility
responses = {v: (v.phrase, v.description) for v in HTTPStatus.__members__.values()}
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""Simple HTTP request handler with GET and HEAD commands.
This serves files from the current directory and any of its
subdirectories. The MIME type for files is determined by
calling the .guess_type() method.
The GET and HEAD requests are identical except that the HEAD
request omits the actual contents of the file.
"""
server_version = "SimpleHTTP/" + __version__
extensions_map = _encodings_map_default = {
".gz": "application/gzip",
".Z": "application/octet-stream",
".bz2": "application/x-bzip2",
".xz": "application/x-xz",
}
def __init__(self, *args, iso=None, archive_r={}, **kwargs):
if iso is None:
raise RuntimeError
self.iso = iso
self.archive_r = archive_r
super().__init__(*args, **kwargs)
def do_GET(self):
"""Serve a GET request."""
f = self.send_head()
if f:
try:
if isinstance(f, str):
self.iso.get_file_from_iso_fp(self.wfile, joliet_path=f)
else:
self.copyfile(f, self.wfile)
finally:
pass # f.close()
def do_HEAD(self):
"""Serve a HEAD request."""
f = self.send_head()
if f:
f.close()
def send_head(self):
"""Common code for GET and HEAD commands.
This sends the response code and MIME headers.
Return value is either a file object (which has to be copied
to the outputfile by the caller unless the command was HEAD,
and must be closed by the caller under all circumstances), or
None, in which case the caller has nothing further to do.
"""
path = self.translate_path(self.path)
if posixpath.dirname(path) == "/archive":
path = "/archive/" + archive_r[posixpath.basename(path)]
try:
f: DirectoryRecord = self.iso.get_record(joliet_path=path)
except pycdlib.pycdlibexception.PyCdlibInvalidInput:
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return None
if f.is_dir():
return self.list_directory(path)
ctype = self.guess_type(path)
# check for trailing "/" which should return 404. See Issue17324
# The test for this was added in test_httpserver.py
# However, some OS platforms accept a trailingSlash as a filename
# See discussion on python-dev and Issue34711 regarding
# parseing and rejection of filenames with a trailing slash
if self.path.endswith("/"):
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return None
timestamp = datetime.datetime(
f.date.years_since_1900 + 1900, f.date.month, f.date.day_of_month, f.date.hour, f.date.minute, f.date.second
).timestamp()
# Use browser cache if possible
if "If-Modified-Since" in self.headers and "If-None-Match" not in self.headers:
# compare If-Modified-Since and time of last file modification
try:
ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
except (TypeError, IndexError, OverflowError, ValueError):
# ignore ill-formed values
pass
else:
if ims.tzinfo is None:
# obsolete format with no timezone, cf.
# https://tools.ietf.org/html/rfc7231#section-7.1.1.1
ims = ims.replace(tzinfo=datetime.timezone.utc)
if ims.tzinfo is datetime.timezone.utc:
# compare to UTC datetime of last modification
last_modif = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
# remove microseconds, like in If-Modified-Since
last_modif = last_modif.replace(microsecond=0)
if last_modif <= ims:
self.send_response(HTTPStatus.NOT_MODIFIED)
self.end_headers()
return None
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", ctype)
self.send_header("Content-Length", f.inode.get_data_length())
self.send_header("Last-Modified", self.date_time_string(timestamp))
self.end_headers()
return self.iso.open_file_from_iso(joliet_path=path)
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = [i.file_identifier().decode("utf-16_be") for i in self.iso.list_children(joliet_path=path)]
except OSError:
self.send_error(HTTPStatus.NOT_FOUND, "No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
r = []
try:
displaypath = urllib.parse.unquote(self.path, errors="surrogatepass")
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = "Directory listing for %s" % displaypath
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ' '"http://www.w3.org/TR/html4/strict.dtd">')
r.append("<html>\n<head>")
r.append('<meta http-equiv="Content-Type" ' 'content="text/html; charset=%s">' % enc)
r.append("<title>%s</title>\n</head>" % title)
r.append("<body>\n<h1>%s</h1>" % title)
r.append("<hr>\n<ul>")
for name in list:
if name == ".":
continue
if name == ".." and path == "/":
continue
fullname = path.rstrip("/") + "/" + name
displayname = linkname = name
# Append / for directories or @ for symbolic links
if self.iso.get_record(rr_path=fullname).is_dir():
displayname = name + "/"
linkname = name + "/"
r.append(
'<li><a href="%s">%s</a></li>'
% (urllib.parse.quote(linkname, errors="surrogatepass"), html.escape(displayname, quote=False))
)
r.append("</ul>\n<hr>\n</body>\n</html>\n")
encoded = "\n".join(r).encode(enc, "surrogateescape")
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html; charset=%s" % enc)
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
return f
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split("?", 1)[0]
path = path.split("#", 1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith("/")
try:
path = urllib.parse.unquote(path, errors="surrogatepass")
except UnicodeDecodeError:
path = urllib.parse.unquote(path)
path = posixpath.normpath(path)
return path
def copyfile(self, source, outputfile):
"""Copy all data between two file objects.
The SOURCE argument is a file object open for reading
(or anything with a read() method) and the DESTINATION
argument is a file object open for writing (or
anything with a write() method).
The only reason for overriding this would be to change
the block size or perhaps to replace newlines by CRLF
-- note however that this the default server uses this
to copy binary data as well.
"""
with source as source_fp:
shutil.copyfileobj(source_fp, outputfile)
pass
def guess_type(self, path):
"""Guess the type of a file.
Argument is a PATH (a filename).
Return value is a string of the form type/subtype,
usable for a MIME Content-type header.
The default implementation looks the file's extension
up in the table self.extensions_map, using application/octet-stream
as a default; however it would be permissible (if
slow) to look inside the data to make a better guess.
"""
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
guess, _ = mimetypes.guess_type(path)
if guess:
return guess
return "application/octet-stream"
def _get_best_family(*address):
infos = socket.getaddrinfo(
*address,
type=socket.SOCK_STREAM,
flags=socket.AI_PASSIVE,
)
family, type, proto, canonname, sockaddr = next(iter(infos))
return family, sockaddr
def serve(HandlerClass=BaseHTTPRequestHandler, ServerClass=ThreadingHTTPServer, protocol="HTTP/1.0", port=8000, bind=None):
"""Test the HTTP request handler class.
This runs an HTTP server on port 8000 (or the port argument).
"""
ServerClass.address_family, addr = _get_best_family(bind, port)
HandlerClass.protocol_version = protocol
with ServerClass(addr, HandlerClass) as httpd:
host, port = httpd.socket.getsockname()[:2]
url_host = f"[{host}]" if ":" in host else host
print(f"Serving HTTP on {host} port {port} " f"(http://{url_host}:{port}/) ...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
if __name__ == "__main__":
import argparse
import contextlib
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--iso", dest="filename", help="ISO file to open", default="texlive2022.iso")
parser.add_argument("--bind", "-b", metavar="ADDRESS", help="specify alternate bind address " "(default: all interfaces)")
parser.add_argument("port", action="store", default=8000, type=int, nargs="?", help="specify alternate port (default: 8000)")
args = parser.parse_args()
iso = pycdlib.PyCdlib()
iso.open(args.filename)
# the archives in the /archive subdirectory are versioned with a revision number
# aka. /archive/hyphen-base.r62751.tar.xz ⇔ /archive/hyphen-base.tar.xz
archive_r = {}
for r in iso.list_children(joliet_path="/archive"):
r = r.file_identifier().decode("utf-16_be")
a = re.sub(r"(\.r\d+)\.tar\.xz", ".tar.xz", r)
archive_r[a] = r
# ensure dual-stack is not disabled; ref #38907
class DualStackServer(ThreadingHTTPServer):
def server_bind(self):
# suppress exception when protocol is IPv4
with contextlib.suppress(Exception):
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
def finish_request(self, request, client_address):
self.RequestHandlerClass(request, client_address, self, iso=iso, archive_r=archive_r)
serve(
HandlerClass=SimpleHTTPRequestHandler,
ServerClass=DualStackServer,
port=args.port,
bind=args.bind,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment