Skip to content

Instantly share code, notes, and snippets.

@nathants
Last active October 17, 2022 21:07
Show Gist options
  • Save nathants/0e6af87bf786598b290e398839e6f6a8 to your computer and use it in GitHub Desktop.
Save nathants/0e6af87bf786598b290e398839e6f6a8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# The MIT License (MIT)
# Copyright (c) 2022-present Nathan Todd-Stone
# https://en.wikipedia.org/wiki/MIT_License#License_terms
"""
send and receive files via https over lan. run on laptop, read qrcode url with phone.
"""
import logging
import re
import mimetypes
import os
import random
import secrets
import ssl
import subprocess
import tornado.gen
import tornado.ioloop
import tornado.log
import tornado.escape
import tornado.httputil
import web # pip install git+https://github.com/nathants/py-{util,pool,web}
import os
port = cert_port = None
logging.basicConfig(level='INFO')
check_output = lambda *a: subprocess.check_output(' '.join(map(str, a)), shell=True, executable='/bin/bash', stderr=subprocess.DEVNULL).decode('utf-8').strip()
check_call = lambda *a: subprocess.check_call(' '.join(map(str, a)), shell=True, executable='/bin/bash', stderr=subprocess.DEVNULL)
def new_port():
try:
ports = [int(x) for x in check_output("ss -pt4H|awk '{print $4}'|cut -d: -f2|grep -o '[0-9]*'").splitlines()]
except:
ports = []
while True:
port = random.randint(2000, 65000)
if port not in ports:
return port
token = secrets.token_hex(24)
ipv4 = check_output("ifconfig | grep inet | grep netmask | grep broadcast | awk '{print $2}' | grep -v '^172\.' | head -n1")
print('ipv4:', ipv4)
check_call('mkdir -p ~/secrets')
ca_crt = os.path.expanduser(f'~/secrets/{ipv4}.ca.crt')
crt = os.path.expanduser(f'~/secrets/{ipv4}.crt')
key = os.path.expanduser(f'~/secrets/{ipv4}.key')
csr = os.path.expanduser(f'~/secrets/{ipv4}.csr')
if not (os.path.isfile(crt) and os.path.isfile(key)):
print('make certificates:', crt, key)
ca_key = check_output(f'openssl genrsa -out /dev/stdout 4096')
check_call(f'echo "{ca_key}" | openssl req -x509 -nodes -key /dev/stdin -out {ca_crt} -days 9999 -subj "/CN={ipv4}/O=Fake Name/C=US" -sha256')
check_call(f'openssl genrsa -out {key} 4096')
check_call(f'openssl req -new -sha256 -key {key} -subj "/C=US/O=Fake Name/CN={ipv4}" -out {csr}')
check_call(f'echo "{ca_key}" | openssl x509 -req -in {csr} -CA {ca_crt} -CAkey /dev/stdin -CAcreateserial -out {crt} -days 9999 -sha256')
new_cert = True
else:
new_cert = False
print('use existing certificates:', crt, key)
if new_cert or 'LOAD_CERT' in os.environ:
@tornado.gen.coroutine
def get_crt(request):
yield None
headers = {'Content-Disposition': f"attachment; filename={os.path.basename(ca_crt)}",
'Content-Type': 'application/x-x509-ca-cert'}
with open(ca_crt, 'rb') as f:
return {'code': 200, 'headers': headers, 'body': f.read()}
app = web.app([(f'/{token}', {'get': get_crt})])
cert_port = new_port()
app.listen(cert_port)
print('load the certificate:')
print(check_output(f'echo http://{ipv4}:{cert_port}/{token} | qrencode -t UTF8'), flush=True)
@tornado.gen.coroutine
def home(request):
yield None
return {'code': 200, 'body': f'<p><a href="/browse/{token}">browse</a></p><p><a href="/upload/{token}">upload</a></p>'}
@tornado.gen.coroutine
def browse(request):
yield None
return {'code': 200, 'body': '<br/>'.join([f'<a href={request["url"].replace("/browse/" + token, "/load/" + token)}/{path}>{path}</a>' for path in sorted(os.listdir('.'))])}
@tornado.gen.coroutine
def load(request):
yield None
path = request['kwargs']['path']
headers = {'Content-Disposition': f"attachment; filename={path}"}
mime = mimetypes.guess_type(path, strict=False)[0]
if path.endswith('.gpg'):
mime = 'application/gpg'
if mime:
headers['Content-Type'] = mime
with open(path, 'rb') as f:
return {'code': 200, 'headers': headers, 'body': f.read()}
body = """
<p>
<form method="post" enctype="multipart/form-data">
<p><input type="file" name="message"></p>
<p><input type="submit"/></p>
</form>
</p>
<p>
<form method="post">
<p><textarea rows=4 cols=50 name="message"></textarea></p>
<p><input type="submit"/></p>
</form>
</p>
"""
@tornado.gen.coroutine
def upload_get(request):
yield None
return {'code': 200,
'body': body}
@tornado.gen.coroutine
def upload_post(request):
yield None
if request['files']:
try:
name = re.findall(rb'filename="([^"]+)"', request['body'])[0]
except TypeError:
name = re.findall(r'filename="([^"]+)"', request['body'])[0]
with open(name, 'wb') as f:
for part in request['files']['message']:
f.write(part['body'])
else:
content = request['body'].decode()
content = content.split('message=', 1)[1]
content = tornado.escape.url_unescape(content)
with open('upload.txt', 'w') as f:
f.write(content)
return {'code': 200}
app = web.app([(f'/{token}', {'get': home}),
(f'/browse/{token}', {'get': browse}),
(f'/load/{token}/:path', {'get': load}),
(f'/upload/{token}', {'get': upload_get,
'post': upload_post})],
debug=True)
if __name__ == '__main__':
options = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
options.load_cert_chain(crt, key)
port = new_port()
app.listen(port, ssl_options=options)
print(check_output(f'echo https://{ipv4}:{port}/{token} | tee >(qrencode -t UTF8)'), flush=True)
tornado.ioloop.IOLoop.current().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment