Last active
October 17, 2022 21:07
-
-
Save nathants/0e6af87bf786598b290e398839e6f6a8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# The MIT License (MIT) | |
# Copyright (c) 2022-present Nathan Todd-Stone | |
# https://en.wikipedia.org/wiki/MIT_License#License_terms | |
""" | |
send and receive files via https over lan. run on laptop, read qrcode url with phone. | |
""" | |
import logging | |
import re | |
import mimetypes | |
import os | |
import random | |
import secrets | |
import ssl | |
import subprocess | |
import tornado.gen | |
import tornado.ioloop | |
import tornado.log | |
import tornado.escape | |
import tornado.httputil | |
import web # pip install git+https://github.com/nathants/py-{util,pool,web} | |
import os | |
port = cert_port = None | |
logging.basicConfig(level='INFO') | |
check_output = lambda *a: subprocess.check_output(' '.join(map(str, a)), shell=True, executable='/bin/bash', stderr=subprocess.DEVNULL).decode('utf-8').strip() | |
check_call = lambda *a: subprocess.check_call(' '.join(map(str, a)), shell=True, executable='/bin/bash', stderr=subprocess.DEVNULL) | |
def new_port(): | |
try: | |
ports = [int(x) for x in check_output("ss -pt4H|awk '{print $4}'|cut -d: -f2|grep -o '[0-9]*'").splitlines()] | |
except: | |
ports = [] | |
while True: | |
port = random.randint(2000, 65000) | |
if port not in ports: | |
return port | |
token = secrets.token_hex(24) | |
ipv4 = check_output("ifconfig | grep inet | grep netmask | grep broadcast | awk '{print $2}' | grep -v '^172\.' | head -n1") | |
print('ipv4:', ipv4) | |
check_call('mkdir -p ~/secrets') | |
ca_crt = os.path.expanduser(f'~/secrets/{ipv4}.ca.crt') | |
crt = os.path.expanduser(f'~/secrets/{ipv4}.crt') | |
key = os.path.expanduser(f'~/secrets/{ipv4}.key') | |
csr = os.path.expanduser(f'~/secrets/{ipv4}.csr') | |
if not (os.path.isfile(crt) and os.path.isfile(key)): | |
print('make certificates:', crt, key) | |
ca_key = check_output(f'openssl genrsa -out /dev/stdout 4096') | |
check_call(f'echo "{ca_key}" | openssl req -x509 -nodes -key /dev/stdin -out {ca_crt} -days 9999 -subj "/CN={ipv4}/O=Fake Name/C=US" -sha256') | |
check_call(f'openssl genrsa -out {key} 4096') | |
check_call(f'openssl req -new -sha256 -key {key} -subj "/C=US/O=Fake Name/CN={ipv4}" -out {csr}') | |
check_call(f'echo "{ca_key}" | openssl x509 -req -in {csr} -CA {ca_crt} -CAkey /dev/stdin -CAcreateserial -out {crt} -days 9999 -sha256') | |
new_cert = True | |
else: | |
new_cert = False | |
print('use existing certificates:', crt, key) | |
if new_cert or 'LOAD_CERT' in os.environ: | |
@tornado.gen.coroutine | |
def get_crt(request): | |
yield None | |
headers = {'Content-Disposition': f"attachment; filename={os.path.basename(ca_crt)}", | |
'Content-Type': 'application/x-x509-ca-cert'} | |
with open(ca_crt, 'rb') as f: | |
return {'code': 200, 'headers': headers, 'body': f.read()} | |
app = web.app([(f'/{token}', {'get': get_crt})]) | |
cert_port = new_port() | |
app.listen(cert_port) | |
print('load the certificate:') | |
print(check_output(f'echo http://{ipv4}:{cert_port}/{token} | qrencode -t UTF8'), flush=True) | |
@tornado.gen.coroutine | |
def home(request): | |
yield None | |
return {'code': 200, 'body': f'<p><a href="/browse/{token}">browse</a></p><p><a href="/upload/{token}">upload</a></p>'} | |
@tornado.gen.coroutine | |
def browse(request): | |
yield None | |
return {'code': 200, 'body': '<br/>'.join([f'<a href={request["url"].replace("/browse/" + token, "/load/" + token)}/{path}>{path}</a>' for path in sorted(os.listdir('.'))])} | |
@tornado.gen.coroutine | |
def load(request): | |
yield None | |
path = request['kwargs']['path'] | |
headers = {'Content-Disposition': f"attachment; filename={path}"} | |
mime = mimetypes.guess_type(path, strict=False)[0] | |
if path.endswith('.gpg'): | |
mime = 'application/gpg' | |
if mime: | |
headers['Content-Type'] = mime | |
with open(path, 'rb') as f: | |
return {'code': 200, 'headers': headers, 'body': f.read()} | |
body = """ | |
<p> | |
<form method="post" enctype="multipart/form-data"> | |
<p><input type="file" name="message"></p> | |
<p><input type="submit"/></p> | |
</form> | |
</p> | |
<p> | |
<form method="post"> | |
<p><textarea rows=4 cols=50 name="message"></textarea></p> | |
<p><input type="submit"/></p> | |
</form> | |
</p> | |
""" | |
@tornado.gen.coroutine | |
def upload_get(request): | |
yield None | |
return {'code': 200, | |
'body': body} | |
@tornado.gen.coroutine | |
def upload_post(request): | |
yield None | |
if request['files']: | |
try: | |
name = re.findall(rb'filename="([^"]+)"', request['body'])[0] | |
except TypeError: | |
name = re.findall(r'filename="([^"]+)"', request['body'])[0] | |
with open(name, 'wb') as f: | |
for part in request['files']['message']: | |
f.write(part['body']) | |
else: | |
content = request['body'].decode() | |
content = content.split('message=', 1)[1] | |
content = tornado.escape.url_unescape(content) | |
with open('upload.txt', 'w') as f: | |
f.write(content) | |
return {'code': 200} | |
app = web.app([(f'/{token}', {'get': home}), | |
(f'/browse/{token}', {'get': browse}), | |
(f'/load/{token}/:path', {'get': load}), | |
(f'/upload/{token}', {'get': upload_get, | |
'post': upload_post})], | |
debug=True) | |
if __name__ == '__main__': | |
options = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) | |
options.load_cert_chain(crt, key) | |
port = new_port() | |
app.listen(port, ssl_options=options) | |
print(check_output(f'echo https://{ipv4}:{port}/{token} | tee >(qrencode -t UTF8)'), flush=True) | |
tornado.ioloop.IOLoop.current().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment