Skip to content

Instantly share code, notes, and snippets.

@jedie
Created January 2, 2022 18:49
Show Gist options
  • Save jedie/2476f973fe8c255b1cde8f9fdeae1bd2 to your computer and use it in GitHub Desktop.
Save jedie/2476f973fe8c255b1cde8f9fdeae1bd2 to your computer and use it in GitHub Desktop.
Web Server to redirect YouTube to invidious ... But is practically unusable because the browser rejects the self-signed certificate ;)
import asyncio
import random
import ssl
from asyncio import StreamReader, StreamWriter
from pathlib import Path
from OpenSSL import crypto
BASE_PATH = Path(__file__).parent
def generate_self_signed_certificate(key_path: Path, cert_path: Path, common_name: str):
"""
Generates an RSA and self-signed X509 certificate for use with TLS/SSL.
"""
rsa = crypto.PKey()
rsa.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.get_subject().CN = common_name
cert.set_serial_number(random.getrandbits(1024))
cert.gmtime_adj_notBefore(0) # Now
cert.gmtime_adj_notAfter(24 * 60 * 60) # 1 day
cert.set_issuer(cert.get_subject())
cert.set_pubkey(rsa)
cert.sign(rsa, digest='sha256')
key_path.write_bytes(
crypto.dump_privatekey(crypto.FILETYPE_PEM, rsa)
)
cert_path.write_bytes(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
)
class WebServer:
def __init__(self, ssl_context, server_hostname, destination):
self.ssl_context = ssl_context
self.server_hostname = server_hostname
self.destination = destination
async def send_response(self, reader: StreamReader, writer: StreamWriter):
peername = writer.get_extra_info('peername')
print('peername:', *peername)
sockname = writer.get_extra_info('sockname')
print('sockname:', *sockname)
server_ip, server_port = sockname
read = True
if server_port == 443:
transport = writer.transport
protocol = transport.get_protocol()
loop = asyncio.get_event_loop()
try:
new_transport = await loop.start_tls(
transport=transport,
protocol=protocol,
sslcontext=self.ssl_context,
server_side=True,
server_hostname=self.server_hostname
)
except ssl.SSLError as err:
print(f'SSLError: {err}')
read = False
else:
reader._transport = new_transport
writer._transport = new_transport
print('Write...')
writer.write(b'HTTP/1.1 302 Found\r\n')
writer.write(b'Location %s\r\n\r\n' % self.destination.encode('utf-8'))
await writer.drain()
if read:
print('Read...')
data = await reader.readline()
print(repr(data))
reader.feed_eof()
writer.close()
async def request_handler(self, reader, writer):
print('_' * 100)
print('Request incoming...')
await self.send_response(reader, writer)
print('-' * 100)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(asyncio.start_server(self.request_handler, '0.0.0.0', 80))
loop.create_task(asyncio.start_server(self.request_handler, '0.0.0.0', 443))
print(f'Fake "{self.server_hostname}" server started...')
loop.run_forever()
def sni_callback(sslobj, servername, sslctx):
print(f'*** Request for: "{servername}" ***')
def msg_callback(
conn,
direction, # ``read`` or ``write``
version: ssl.TLSVersion,
content_type: ssl._TLSContentType,
msg_type: ssl._TLSMessageType,
data: bytes # Raw, decrypted message content as bytes
):
print('\t', version.name, direction, len(data), content_type.name, msg_type.name)
def main(domain, destination):
certfile_name = f'{domain}_server.crt'
keyfile_name = f'{domain}_server.key'
certfile_path = BASE_PATH / certfile_name
keyfile_path = BASE_PATH / keyfile_name
generate_self_signed_certificate(
keyfile_path, certfile_path, common_name=domain
)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=certfile_path, keyfile=keyfile_path)
# ssl_context.verify_mode = ssl.CERT_NONE
# ssl_context.check_hostname = False
ssl_context.sni_callback = sni_callback
ssl_context._msg_callback = msg_callback
server = WebServer(
ssl_context=ssl_context,
server_hostname=domain,
destination=destination
)
server.run()
if __name__ == '__main__':
main(domain='www.youtube.com', destination='https://invidious.foobar.tld')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment