Skip to content

Instantly share code, notes, and snippets.

@wieshka
Last active October 6, 2023 05:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save wieshka/024dcf9bd632269d45e4f373e42be51b to your computer and use it in GitHub Desktop.
Save wieshka/024dcf9bd632269d45e4f373e42be51b to your computer and use it in GitHub Desktop.
WSE middleware
#!/usr/bin/env python3
import argparse
import asyncio
import websockets
import ssl
# Since we are sending messages to wss://localhost, we need to bypass SSL verification,
# hence initialising SSL context for that
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async def middleware(websocket, path):
# Initialises both websocket server and client, implements message bridge
url = WOWZA_ENDPOINT + path # Let's re-use original path instead of hardcoding
async with websockets.connect(url, ssl=ssl_context) as ws:
task_send_to_server = asyncio.create_task(send_from_client_to_server(ws, websocket))
task_send_to_client = asyncio.create_task(send_from_server_to_client(ws, websocket))
await task_send_to_server
await task_send_to_client
async def send_from_client_to_server(ws, websocket):
async for message in ws:
await websocket.send(message)
async def send_from_server_to_client(ws, websocket):
async for message in websocket:
await ws.send(message)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='websocket proxy.')
parser.add_argument('--host', help='Host to bind to.',
default='localhost')
parser.add_argument('--port', help='Port to bind to.',
default=1936)
parser.add_argument('--wowza_endpoint', help='Remote websocket url',
default='wss://localhost:8400')
args = parser.parse_args()
WOWZA_ENDPOINT = args.wowza_endpoint
start_server = websockets.serve(middleware, args.host, args.port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
[Unit]
Description=Middleware WSE proxy
After=multi-user.target
[Service]
Type=idle
ExecStart=/opt/middleware.py
Restart=on-failure
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment