Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save swerder/f1ef7cfb48f45370cd75fea47593de74 to your computer and use it in GitHub Desktop.
Save swerder/f1ef7cfb48f45370cd75fea47593de74 to your computer and use it in GitHub Desktop.
Simple Python Http Server with Upload
#!/usr/bin/env python3
"""HTTP Server with auth and Upload.
This module builds on SimpleHTTPRequestHandler,
implements simple Authentication, use ssl
full "multipart" rfc2046 handling,
multi file upload, create folder and delete files,
TableView file listing with icon/size/date
from bones7456: https://github.com/bones7456/bones7456/blob/master/SimpleHTTPServerWithUpload.py / https://gist.github.com/UniIsland/3346170
- originally based logic
from cpdef: https://gist.github.com/cpdef/9f4fa956ff41ca8b9902c4b329596acc
- authenticate, certificate, python3
from joonahn: https://gist.github.com/joonahn/5c21fde633bf61087fa3faea77e1f77f
- multi upload
from Tallguy297: https://github.com/Tallguy297/SimpleHTTPServerWithUpload/blob/master/SimpleHTTPServerWithUpload.py
- style change
from swerder: https://gist.github.com/swerder/f1ef7cfb48f45370cd75fea47593de74
- merging all together
- extend SimpleHTTPRequestHandler not BaseHTTPRequestHandler (getting if-modified support)
- refactor html generation
- complete refactor
- add full "multipart/form-data" handling
- add folder creation
- add file deletion
create certificate with the following command:
openssl req -new -x509 -keyout server.pem -out server.pem -days 365 -nodes
usage: server.py [port] [user:password] [certfile]
example: ../server.py 8443 test:topsecret ../server.pem
access the server via the browser:
https://ip:port
the "s" in https is important for ssl/tls
"""
__version__ = "0.3"
__all__ = ["AuthRequestHandler", "MultipartAuthRequestHandler","FileUploadRequestHandler","FileUploadRequestHandlerTableView"]
__author__ = "swerder"
__source_page__ = "https://gist.github.com/swerder/f1ef7cfb48f45370cd75fea47593de74"
__contributors__ = "bones7456,cpdef,joonahn,Tallguy297"
import os
import posixpath
import http.server
import urllib.request, urllib.parse, urllib.error
import html
import http
import shutil
import mimetypes
import re
from io import BytesIO
import ssl
import sys
import base64
import time
from string import Template
class AuthRequestHandler(http.server.SimpleHTTPRequestHandler):
"""Handler for HEAD,GET,POST based on SimpleHTTPRequestHandler that handle Authentication
after success it call do_<VERB>_auth (e.g. do_GET_auth)
"""
server_version = "AuthRequestHandler/" + __version__
sys_version = "" # replaces Python/3.X.X
sleep_after_login_failed = 10
realm = "Login"
def is_authenticated(self):
global key
auth_header = self.headers['Authorization']
if auth_header != None:
auth_header = auth_header.encode("utf-8")
return auth_header and auth_header == b'Basic ' + key
def do_AUTHHEAD(self):
self.send_response(http.HTTPStatus.UNAUTHORIZED)
self.send_header('WWW-Authenticate', f'Basic realm="{self.realm}"')
self.send_header('Content-type', 'text/html')
self.end_headers()
def try_authenticate(self):
if not self.is_authenticated():
self.server_version = "" # don't show what I am before login
self.do_AUTHHEAD()
print('not authenticated')
self.wfile.write(b'not authenticated')
self.close_connection = True
time.sleep(sleep_after_login_failed)
return False
print('authenticated')
return True
def do_HEAD_auth(self):
super().do_HEAD()
def do_GET_auth(self):
super().do_GET()
def do_POST_auth(self):
super().do_POST()
def do_HEAD(self):
"""Serve a HEAD request."""
if self.try_authenticate():
self.do_HEAD_auth()
def do_GET(self):
"""Serve a GET request."""
if self.try_authenticate():
self.do_GET_auth()
def do_POST(self):
"""Serve a POST request."""
if self.try_authenticate():
self.do_POST_auth()
class PartConf():
def __init__(self, name, out=None, headers=None, handler=None):
self.name = name
self.out = out
self.headers = headers
self.handler = handler
class DisplayableException(Exception):
pass
class MultipartAuthRequestHandler(AuthRequestHandler):
"""Handler parse POST request with content-type 'multipart/*' designed for 'multipart/form-data'
"""
server_version = "MultipartAuthRequestHandler/" + __version__
def generate_html_response(self, title, headContent, bodyContent, lang="en", enc="UTF-8"):
return f"""
<!DOCTYPE html>
<html lang="{lang}">
<head>
<meta charset="{enc}">
<title>{title}</title>
{headContent}
</head>
<body>
{bodyContent}
</body>
</html>
""".encode(enc, 'surrogateescape')
def send_html_response_header(self, htmlContent, status=http.HTTPStatus.OK, enc="UTF-8"):
self.send_response(status)
self.send_header("Content-type", f"text/html; charset={enc}")
self.send_header("Content-Length", str(len(htmlContent)))
self.end_headers()
def send_html_response(self, title, headContent, bodyContent, lang="en", enc="UTF-8", status=http.HTTPStatus.OK):
htmlContent = self.generate_html_response(title, headContent, bodyContent, lang, enc)
self.send_html_response_header(htmlContent, status, enc)
self.wfile.write(htmlContent)
def do_POST_auth(self):
try:
info = self.deal_post_data()
status = http.HTTPStatus.OK
except DisplayableException as ex:
status = http.HTTPStatus.INTERNAL_SERVER_ERROR
info = str(ex)
print((status, info, "by: ", self.client_address, self.headers.get('X-Forwarded-For', '')))
bodyContent = self.get_POST_body_html(info, status)
self.send_html_response("post response page", "", bodyContent, status=status)
def get_POST_body_html(self, info, status):
return info;
def deal_post_data(self):
if self.headers.get_content_maintype() == "multipart":
return self.deal_multipart_data()
raise DisplayableException("multipart Content-Type required, e.g. multipart/form-data but is " + self.headers.get_content_type())
def deal_multipart_data(self):
boundary = self.headers.get_boundary()
if boundary is None:
raise DisplayableException("Content-Type header doesn't contain boundary")
boundary = boundary.encode()
boundaryEnd = boundary + b"--"
remainbytes = int(self.headers['content-length'])
response = []
#read all parts, https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1
partConf = PartConf("preamble")
preline = None
while remainbytes > 0:
line = self.rfile.readline()
remainbytes -= len(line)
#print (partConf.name, "line", line, "preline",preline, remainbytes) # debuging
if boundary in line:
if partConf.out != None:
#write last part/preline but remove CRLF (part of boundary logic)
if preline[-2:] == b'\r\n': #CRLF
preline = preline[0:-2]
else: #only CR or only LF (not correct boundary logic)
preline = preline[0:-1]
partConf.out.write(preline)
if partConf.handler:
message = partConf.handler(partConf)
if message != None:
response.append(message)
else:
if partConf.out:
partConf.out.close()
if boundaryEnd in line:
partConf = PartConf("epilogue")
else:
partConf = PartConf("__header__", None, {})
preline = None
else:
if partConf.out != None:
partConf.out.write(preline)
elif partConf.name == "__header__":
#read part header
if preline != None:
if preline == b'\r\n' or preline == b'\n':
partConf = self.deal_multipart_header(partConf)
else:
header = [p.strip() for p in preline.decode().split(":", 1)]
partConf.headers[header[0]] = header[-1] #-1 =last, here same as 1, but no error if not correct header/no':' delimiter
preline = line
else:
if partConf.name != "epilogue":
raise DisplayableException("invalid multipart, end boundary not found")
#if partConf.out != None:
# partConf.out.write(preline)
#if partConf.handler:
# message = partConf.handler(partConf)
# if message != None:
# response.append(message)
return "<br>".join(response)
def split_header_value(self, header):
header_parts = [p.strip() for p in header.split(";")]
value = header_parts[0]
params = {key.strip():val.strip().strip('"') for key,val in [p.split("=") for p in header_parts[1:]]}
return value, params
def deal_multipart_header(self, partConf):
if self.headers.get_content_type() == "multipart/form-data":
#https://www.rfc-editor.org/rfc/rfc7578#section-4.2
if not "Content-Disposition" in partConf.headers:
raise DisplayableException("missing Content-Disposition header, invalid multipart/form-data syntax")
dispositionValue, dispositionParams = self.split_header_value(partConf.headers["Content-Disposition"])
if dispositionValue != "form-data":
raise DisplayableException(f"invalid Content-Disposition header, expect 'form-data' type but get '${dispositionValue}'")
name = dispositionParams.get("name")
if not name:
raise DisplayableException("invalid Content-Disposition header, need name param")
return self.deal_field_header(dispositionParams, partConf.headers)
else:
return PartConf(None, BytesIO(), partConf.headers, self.deal_field_end)
def deal_field_header(self, dispositionParams, headers):
return PartConf(dispositionParams.get("name"), BytesIO(), headers, self.deal_field_end)
def deal_field_end(self, partConf):
content = partConf.out.getvalue().decode()
partConf.out.close()
return self.deal_field_data(partConf.name, content, partConf.headers)
def deal_field_data(self, name, content, headers):
return f"{name} = '{content}'"
class FileUploadRequestHandler(MultipartAuthRequestHandler):
"""HTTP request handler with GET/HEAD/POST commands.
This serves files from the current directory and any of it's
subdirectories. The MIME type for files is determined by
calling the .guess_type() method. And can reveive file upload,
folder creation and file delete by client.
The GET/HEAD requests are identical except that the HEAD
request omits the actual contents of the file.
"""
server_version = "FileUploadRequestHandler/" + __version__
show_upload = True
show_create_folder = True
show_delete_file = True
def get_base_style_html(self):
return """<style type="text/css">
* {font-family: Helvetica; font-size: 16px; }
a { text-decoration: none; }
</style>
"""
def get_POST_body_html(self, info, status):
if 'referer' in self.headers:
referer = html.escape(self.headers['referer'])
backLink = f'<br><a href="{referer}">back</a>'
else:
backLink = ''
result = "Success:" if status == http.HTTPStatus.OK else "Failed:"
bodyContent = Template("""
<h2>Upload Result Page</h2>
<hr>
<strong>$result</strong><br>
$info
$backLink
<hr>
<small>Powered By: $by, check for new version <a href="$src" target="_blank">here</a>.</small>
<hr>
""").substitute(result=result, info=info, backLink=backLink, by=__author__ + " and " + __contributors__, src=__source_page__).strip()
return bodyContent
def deal_field_header(self, dispositionParams, headers):
if "filename" in dispositionParams:
return self.deal_fileupload_header(dispositionParams, headers)
return super().deal_field_header(dispositionParams, headers)
def deal_fileupload_header(self, dispositionParams, headers):
if not self.show_upload:
raise DisplayableException("disallowed upload request")
filename = dispositionParams.get("filename")
if not filename:
raise DisplayableException("Can't find out file name...")
filePath = self.get_full_path(filename)
if filePath == None:
raise DisplayableException("upload to folder not allowed")
print(f"try to open {filePath}")
try:
out = open(filePath, 'wb')
except IOError:
raise DisplayableException("Can't create file to write.<br>Do you have permission to write?")
partConf = PartConf("file", out, headers, self.deal_file_end)
partConf.dispositionParams = dispositionParams
return partConf
def get_full_path(self, filename):
path = self.translate_path(self.path)
#prevent any folder char in filename
filename = filename.split('/')[-1].split('\\')[-1]
filePath = os.path.join(path, filename)
#double check valid path
if not os.path.abspath(filePath).startswith(os.path.abspath("."+self.path)):
return None
return filePath
def deal_field_data(self, name, content, headers):
if name == "folder":
return self.deal_field_folder_data(content, headers)
elif name == "delete":
return self.deal_field_delete_data(content, headers)
else:
raise DisplayableException(f"unknown field: {name} = '{content}'")
def deal_field_folder_data(self, content, headers):
if not self.show_create_folder:
raise DisplayableException("disallowed folder request")
dirPath = self.get_full_path(content)
if not dirPath:
raise DisplayableException(f"folder name invalid '{content}'")
elif not os.path.exists(dirPath):
os.mkdir(dirPath)
return f"created folder: '{content}'"
else:
raise DisplayableException(f"folder '{content}' already exist")
def deal_field_delete_data(self, content, headers):
if not self.show_delete_file:
raise DisplayableException("disallowed delete request")
filePath = self.get_full_path(content)
if not filePath:
raise DisplayableException(f"file name invalid '{content}'")
if os.path.exists(filePath):
os.remove(filePath)
return f"deleted file '{content}'"
else:
raise DisplayableException(f"file does not exist '{content}'")
def deal_file_end(self, partConf):
partConf.out.close()
return partConf.dispositionParams.get("filename") + " uploaded"
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = os.listdir(path)
except OSError:
self.send_error(
http.HTTPStatus.NOT_FOUND,
"No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
try:
displaypath = urllib.parse.unquote(self.path, errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
htmlContent = self.list_directory_generate_html(path, list, displaypath, enc)
self.send_html_response_header(htmlContent, enc=enc)
f = BytesIO()
f.write(htmlContent)
f.seek(0)
return f
def list_directory_generate_html(self, path, list, displaypath, enc):
fileListHtml, headContent = self.list_directory_files(path, list)
title = f'Directory listing for {displaypath}'
r = []
r.append(f'<h1>{title}</h1>')
if self.show_upload:
r.append(self.get_upload_html())
if self.show_create_folder:
r.append(self.get_create_folder_html())
r.append('<hr>')
r.append(fileListHtml)
r.append('<hr>')
bodyContent = '\n'.join(r)
return self.generate_html_response(title, headContent, bodyContent, enc=enc)
def get_upload_html(self):
return """
<hr>
<form ENCTYPE="multipart/form-data" method="post">
<input name="file" type="file" multiple/>
<input type="submit" value="upload"/>
</form>
"""
def get_create_folder_html(self):
return """
<hr>
<form ENCTYPE="multipart/form-data" method="post">
<label for="folder">new folder</label>
<input name="folder" id="folder" type="text"/>
<input type="submit" value="create"/>
</form>
"""
def get_delete_html(self, filename):
filename = html.escape(filename)
return f"""
<form ENCTYPE="multipart/form-data" method="post" style="display: inline;">
<input name="delete" type="hidden" value="{filename}"/>
<input type="submit" value="delete"/>
</form>
"""
def list_directory_files(self, path, list):
r = []
r.append('<ul>')
if path[:-1] != os.getcwd():
r.append('<li><a href="../">Parent Directory</a></li>')
deleteForm = ""
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
deleteForm = ""
elif self.show_delete_file:
deleteForm = self.get_delete_html(name)
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
r.append('<li><a href="%s">%s</a>%s</li>'
% (urllib.parse.quote(linkname, errors='surrogatepass'),
html.escape(displayname, quote=False), deleteForm))
r.append('</ul>')
return '\n'.join(r), self.get_base_style_html()
def fbytes(size):
'Return the given bytes as a human friendly KB, MB, GB, or TB string'
size = float(size)
KB = float(1024)
MB = float(KB ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776
if size < KB:
return '{0} {1}'.format(size,'Bytes' if size > 1 else 'Byte')
elif KB <= size < MB:
return '{0:.2f} KB'.format(size/KB)
elif MB <= size < GB:
return '{0:.2f} MB'.format(size/MB)
elif GB <= size < TB:
return '{0:.2f} GB'.format(size/GB)
elif TB <= size:
return '{0:.2f} TB'.format(size/TB)
class FileUploadRequestHandlerTableView(FileUploadRequestHandler):
server_version = "FileUploadRequestHandlerTableView/" + __version__
image_ext = ['bmp','gif','jpg','jpeg','png']
ext_to_class = {
'avi':'img-movie',
'mpg':'img-movie',
'idx':'img-subtitle',
'srt':'img-subtitle',
'sub':'img-subtitle',
'iso':'img-iso',
}
class_to_img = {
'img-default':'',
'img-parent':'',
'img-dir':'',
'img-link':'',
'img-movie':'',
'img-subtitle':'',
'img-iso':'',
}
def list_directory_head_html_additional(self, usedClass):
img_class_templ = Template(".file-img.$clazz { background-image: url('$img')}")
return """
<style type="text/css">
* {font-family: Helvetica; font-size: 16px; }
a { text-decoration: none; }
a:link { text-decoration: none; font-weight: bold; color: #0000ff; }
a:visited { text-decoration: none; font-weight: bold; color: #0000ff; }
a:active { text-decoration: none; font-weight: bold; color: #0000ff; }
a:hover { text-decoration: none; font-weight: bold; color: #ff0000; }
table { border-collapse: separate;}
th, td { padding:0px 10px;}
.file-size { text-align:right; font-weight: bold; color:#FF0000; }
.file-date { text-align:right; font-weight: bold; }
.file-img { width:24px; height:24px; background-size: contain; background-position: center; background-repeat: no-repeat;}
""" + \
"\n".join(img_class_templ.substitute(clazz=clazz, img=img) for clazz,img in self.class_to_img.items() if clazz in usedClass) + \
"""
</style>
"""
def list_directory_files(self, path, list):
file_html_templ = Template('<tr><td><div class="file-img $fileImgClass" $specialImgStyle></div></td><td><a href="$linkname" target="$target">$displayname</a></td><td class="file-size">$fsize</td><td class="file-date">$created_date</td>$deleteForm</tr>\n')
img_style_templ = Template(''' style="background-image: url('$name')"''')
r=[]
deleteForm = ''
r.append('<table>')
usedClass=set()
if path[:-1] != os.getcwd():
fileImgClass = 'img-parent'
r.append(file_html_templ.substitute(fileImgClass=fileImgClass, specialImgStyle="", linkname="../", target="_self", displayname="Parent Directory", fsize="", created_date="", deleteForm=deleteForm))
usedClass.add(fileImgClass)
for name in list:
fileImgClass = 'img-default'
target = '_blank'
specialImgStyle = ""
fullname = os.path.join(path, name)
displayname = linkname = name
fsize = fbytes(os.path.getsize(fullname))
created_date = time.strftime("%d.%m.%Y %H:%M", time.localtime(os.path.getctime(fullname)))
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
fileImgClass = 'img-dir'
target = '_self'
displayname = name + '/'
linkname = name + '/'
fsize = ''
created_date = ''
deleteForm = ''
elif self.show_delete_file:
deleteForm = '<td>' + self.get_delete_html(name) + '</td>'
if os.path.islink(fullname):
fileImgClass = 'img-link'
displayname = name + '@'
ext = name.rsplit(".",1)[-1]
if ext in self.image_ext:
fileImgClass = ''
specialImgStyle = img_style_templ.substitute(name=name)
else:
fileImgClass = self.ext_to_class.get(ext, fileImgClass)
# Note: a link to a directory displays with @ and links with /
r.append(file_html_templ.substitute(fileImgClass=fileImgClass, specialImgStyle=specialImgStyle, linkname=urllib.parse.quote(linkname), target=target, displayname=html.escape(displayname), fsize=fsize, created_date=created_date, deleteForm=deleteForm))
usedClass.add(fileImgClass)
r.append('</table>')
return '\n'.join(r), self.list_directory_head_html_additional(usedClass)
def run(HandlerClass = FileUploadRequestHandlerTableView,
ServerClass = http.server.HTTPServer,
port = 8000,
protocol = "HTTP/1.0",
bind = ""):
"""
This runs an HTTP server on port 8000 (or the port argument).
"""
server_address = (bind, port)
HandlerClass.protocol_version = protocol
httpd = ServerClass(server_address, HandlerClass)
global certfile
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=certfile,
server_side=True)
sa = httpd.socket.getsockname()
print("Serving HTTP on", sa[0], "port", sa[1], "...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
httpd.server_close()
sys.exit(0)
if __name__ == '__main__':
if len(sys.argv) < 3:
print(sys.argv[0] + " <port> <user>:<pw> <cert.pem>")
sys.exit(1)
port = int(sys.argv[1])
key = base64.b64encode(sys.argv[2].encode("utf-8"))
certfile = sys.argv[3]
run(port=port)
@image72
Copy link

image72 commented Jun 29, 2023

ssl server may optional by args, disabled by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment