Skip to content

Instantly share code, notes, and snippets.

@ducnguyen6431
Created December 23, 2019 12:56
Show Gist options
  • Save ducnguyen6431/478bf603bbc2c52ee3bfd7a7cbbfff30 to your computer and use it in GitHub Desktop.
Save ducnguyen6431/478bf603bbc2c52ee3bfd7a7cbbfff30 to your computer and use it in GitHub Desktop.
# change these values as you see fit
# after running `sudo ./install`, the server should run at http://host:port/base_url
# Host info
host = ''
port = 0
database = 'simple_server.db'
# Auth
username = ''
password = ''
# Misc
base_url = ''
# for example, with above default values, the server should run at 127.0.0.1:5000/
#!/usr/bin/env python
__author__ = "dn6431"
__contributor__ = ["bones7456", "wonjohnchoi"]
__description__ = """Simple HTTP Server With Upload and Authentication and more
This module is built on HTTPServer by implementing the standard GET
and HEAD requests in a fairly straight forward manner"""
__epilog__ = """Created by: {}
Contributor: {} {}""".format(__author__, __contributor__[0], __contributor__[1])
import os
import posixpath
from http.server import HTTPServer, BaseHTTPRequestHandler, DEFAULT_ERROR_MESSAGE
from http import HTTPStatus
import requests
import html
import shutil
import mimetypes
from io import BytesIO
import re
import sys
import base64
import settings
from functools import partial
import json
def gen_key(username='', password=''):
return base64.b64encode('{}:{}'.format(username, password).encode('utf-8'))
class Counter:
def __init__(self):
import sqlite3
print("Creating database and table counter")
self.conn = sqlite3.connect(settings.database)
self.cursor = self.conn.cursor()
self.cursor.execute('''create table if not exists counter(full_path text primary key, count integer)''')
def read_counter(self, path):
# Check how many time file is visited
self.cursor = self.conn.cursor()
self.cursor.execute('select * from counter where full_path=?', (path,))
row = self.cursor.fetchone()
count = 0
if row is not None:
count = row[-1]
return count
def increase_counter(self, path):
# Increase number of time file is visited
res = self.read_counter(path)
res += 1
print("{} was visited {} times".format(path, res))
self.cursor = self.conn.cursor()
self.cursor.execute('replace into counter(full_path, count) values(?, ?)', (path, res))
self.conn.commit()
if self.cursor.rowcount > 0:
print("Successfully updated database")
def copyfile(source, destination):
shutil.copyfileobj(source, destination)
class SimpleServer(BaseHTTPRequestHandler):
counter = Counter()
def __init__(self, *args, directory=None, username=None, password=None, secure=False, **kwargs):
if directory:
self.directory = directory
else:
self.directory = os.getcwd()
self.username = username
self.password = password
self.secure = secure
super().__init__(*args, **kwargs)
def do_HEAD(self):
"""Serve a HEAD request"""
f = self.send_head()
if f:
f.close()
def do_GET(self):
if self.username and not self.authenticate():
return
f = self.send_head()
if f:
copyfile(f, self.wfile)
f.close()
def do_POST(self):
if self.username and not self.authenticate():
return
result, info = self.deal_post_data()
print('{} by {} @ {}'.format(info, self.client_address[0], self.client_address[1]))
f = BytesIO()
response = {"saved files": info}
self.send_response(200)
if type(info) is list:
f.write(json.dumps(response).encode('utf-8'))
self.send_header("Content-type", "text/json")
else:
self.send_header("Content-type", "text/html")
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'.encode('utf-8'))
f.write('<html>\n<title>Upload Result Page</title>\n'.encode('utf-8'))
f.write('<body>\n<h2>Upload Result Page</h2>\n'.encode('utf-8'))
f.write('<hr>\n'.encode('utf-8'))
f.write('<strong>Failed:</strong>'.encode('utf-8'))
f.write(info.encode('utf-8'))
f.write('<br><a href="{}">back</a>'.format(self.headers['referer']).encode('utf-8'))
f.write('<hr><small>Powerd By: bones7456, check new version at '.encode('utf-8'))
f.write('<a href="http://li2z.cn/?s=SimpleHTTPServerWithUpload">'.encode('utf-8'))
f.write('</body>\n</html>\n'.encode('utf-8'))
length = f.tell()
f.seek(0)
self.send_header("Content-Length", str(length))
self.end_headers()
if f:
print('writing response')
copyfile(f, self.wfile)
print('end response')
f.close()
def deal_post_data(self):
if 'Content-Type' not in self.headers.keys():
return False, "Cannot determine content type"
boundary = self.headers['Content-Type'].split("=")[-1]
if 'Content-Length' not in self.headers.keys():
return False, "Cannot determine content length"
remain_bytes = int(self.headers['content-length'])
line = self.rfile.readline()
remain_bytes -= len(line)
if boundary not in line.decode():
return False, "Content NOT begin with boundary"
path = self.translate_path(self.path)
if not os.path.exists(path):
os.makedirs(path)
uploaded = []
line = self.rfile.readline()
remain_bytes -= len(line)
result, filename = self.fetch_filename(line)
while result:
result, message, remain_bytes = self.download_file(remain_bytes, boundary, path, filename)
uploaded.append(message)
if not result:
return result, message
else:
if remain_bytes > 0:
line = self.rfile.readline()
remain_bytes -= len(line)
result, filename = self.fetch_filename(line)
else:
result = False
return True, uploaded
def download_file(self, remain_bytes, boundary, path, filename):
file_path = os.path.join(path, filename)
# Read content type of file
line = self.rfile.readline()
remain_bytes -= len(line)
# Empty line before content
line = self.rfile.readline()
remain_bytes -= len(line)
try:
out = open(file_path, 'wb')
except IOError:
return False, "Can't create file to write. Permission required", remain_bytes
preline = self.rfile.readline()
remain_bytes -= len(preline)
while remain_bytes > 0:
line = self.rfile.readline()
remain_bytes -= len(line)
if boundary.encode('utf-8') in line:
out.write(preline)
out.close()
return True, filename, remain_bytes
else:
out.write(preline)
preline = line
return False, "Unexpected end of data stream", remain_bytes
def fetch_filename(self, content):
fn = re.findall(r'Content-Disposition.*name="(.*)"; filename="(.*)"', content.decode())
if not fn or len(fn) < 1 or len(fn[0]) < 2:
return False, "Can't find filename"
return True, fn[0][1]
def do_AUTHHEAD(self):
self.send_response(HTTPStatus.UNAUTHORIZED)
self.send_header('WWW-Authenticate', 'Basic realm="Test"')
self.send_header('Content-type', 'text/html')
self.end_headers()
def authenticate(self):
auth_header = self.headers.get('Authorization')
print("Authenticating with key {}".format(gen_key(self.username, self.password)))
if auth_header and auth_header == 'Basic {}'.format(gen_key(self.username, self.password).decode()):
print("Authenticated")
return True
self.do_AUTHHEAD()
print("Authenticate failed")
self.wfile.write(b'<html><body><h1>Authenticate failed<h1><body><html>')
return False
def send_head(self):
# Translate to local path
path = self.translate_path(self.path)
f = None
if os.path.isdir(path):
if not self.path.endswith('/'):
self.send_response(HTTPStatus.MOVED_PERMANENTLY)
self.send_header("Location", self.path + '/')
self.end_headers()
return f
for index in "index.html", "index.htm":
index = os.path.join(path, index)
if os.path.exists(index):
path = index
break
SimpleServer.counter.increase_counter(path)
if os.path.isdir(path):
return self.list_dir(path)
file_type = self.guess_type(path)
try:
f = open(path, 'rb')
except IOError:
self.send_error(HTTPStatus.NOT_FOUND, "File not found", "We 're sorry")
return f
self.send_response(HTTPStatus.OK)
file_stat = os.fstat(f.fileno())
self.send_header("Content-Type", file_type)
self.send_header("Content-Length", str(file_stat.st_size))
self.send_header("Last-Modified", self.date_time_string(file_stat.st_mtime))
self.end_headers()
return f
def translate_path(self, path):
# abandon query parameters
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith('/')
try:
path = requests.utils.unquote(path, errors='surrogatepass')
except UnicodeDecodeError:
path = requests.utils.unquote(path)
path = posixpath.normpath(path)
words = path.split('/')
words = filter(None, words)
path = self.directory
for word in words:
if os.path.dirname(word) or word in (os.curdir, os.pardir):
# Ignore components that are not a simple file/directory name
continue
path = os.path.join(path, word)
if trailing_slash:
path += '/'
return path
def list_dir(self, path):
try:
dir_list = os.listdir(path)
except os.error:
self.send_error(HTTPStatus.NOT_FOUND, "Directory require higher permission")
return None
dir_list.sort(key=lambda a: a.lower())
if path != '/':
dir_list = ['..'] + dir_list
f = BytesIO()
display_path = html.escape(requests.utils.unquote(self.path))
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'.encode('utf-8'))
f.write('<html>\n<title>Directory listing for {}</title>\n'.format(display_path).encode('utf-8'))
f.write('<body>\n<h2>Directory listing for {} (frequently used directories are more reddish)</h2>\n'.format(
display_path).encode('utf-8'))
f.write('<hr>\n'.encode('utf-8'))
f.write('<form ENCTYPE="multipart/form-data" method="post">'.encode('utf-8'))
f.write('<input name="file" type="file"/>'.encode('utf-8'))
f.write('<input type="submit" value="upload"/></form>\n'.encode('utf-8'))
f.write('<hr>\n<ul>\n'.encode('utf-8'))
tot_counts = 0
for name in dir_list:
child_file_path = posixpath.normpath(os.path.join(path, name))
counts = self.counter.read_counter(child_file_path)
tot_counts += counts
# avoid divide by zero error
if tot_counts == 0:
tot_counts += 1
for name in dir_list:
child_file_path = posixpath.normpath(os.path.join(path, name))
display_name = link_name = name
# Append / for directories or @ for symbolic links
if os.path.isdir(child_file_path):
display_name = name + "/"
link_name = name + "/"
if os.path.islink(child_file_path):
display_name = name + "@"
# Note: a link to a directory displays with @ and links with /
counts = self.counter.read_counter(child_file_path)
# red portion of rgb value. with **0.2, it's overall more reddish
rgb_r = 255 * (float(counts) / tot_counts) ** 0.2
f.write('<li><a style="color:rgb({},0,0)" href="{}">{}</a>\n'.format(rgb_r, requests.utils.quote(link_name),
html.escape(display_name)).encode(
'utf-8'))
f.write('</ul>\n<hr>\n</body>\n</html>\n'.encode('utf-8'))
length = f.tell()
f.seek(0)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(length))
self.end_headers()
return f
def guess_type(self, path):
base, ext = posixpath.splitext(path)
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
else:
return self.extensions_map['']
if not mimetypes.inited:
mimetypes.init()
extensions_map = mimetypes.types_map.copy()
extensions_map.update({
'': 'application/octet-stream', # Default
'.py': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
'.swift': 'text/plain'
})
def start_server(handlerClass=BaseHTTPRequestHandler,
serverClass=HTTPServer,
protocol='HTTP/1.0', bind=settings.host, port=settings.port):
server_address = (bind, port)
handlerClass.protocol_version = protocol
with serverClass(server_address, handlerClass) as httpd:
sa = httpd.socket.getsockname()
serve_message = "Serving HTTP on {host} port {port}\nhttp://{host}:{port}/"
print(serve_message.format(host=sa[0], port=sa[1]))
print('Use <Ctrl-C> to stop')
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(usage=__description__,
description="You can also load args via settings.py",
epilog=__epilog__)
parser.add_argument('--bind', '-b', default='127.0.0.1', metavar='ADDRESS',
help='Specify alternate bind address'
'[default: 127.0.0.1]')
parser.add_argument('--directory', '-d', default=os.getcwd(),
help='Specify root directory'
'[default: current directory]')
parser.add_argument('--username', '-u', metavar='USERNAME',
help='Username to secure server'
'[default: your username]')
parser.add_argument('--password', '-p', metavar='PASSWORD',
help='Password to secure server')
parser.add_argument('port', action='store',
default=14230, type=int,
nargs='?',
help='Specify server port [default: 14230]')
arguments = parser.parse_args()
handler_class = partial(SimpleServer,
directory=arguments.directory,
username=arguments.username,
password=arguments.password)
start_server(handlerClass=handler_class,
bind=arguments.bind if not settings.host.strip() else settings.host.strip(),
port=arguments.port if settings.port == 0 else settings.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment