Skip to content

Instantly share code, notes, and snippets.

@afiodorov
Created November 29, 2023 22:39
Show Gist options
  • Save afiodorov/1f4537c27f9ffac4cfe035bae297d840 to your computer and use it in GitHub Desktop.
Save afiodorov/1f4537c27f9ffac4cfe035bae297d840 to your computer and use it in GitHub Desktop.
Websocket proxy
import asyncio
import websockets
async def client_to_server(client, server):
async for message in client:
await server.send(message)
async def server_to_client(client, server):
async for message in server:
await client.send(message)
async def proxy(server, path):
target_uri = f"wss://api.deepgram.com{path}"
api_key = "token"
headers = {"Authorization": f"Token {api_key}"}
try:
async with websockets.connect(target_uri, extra_headers=headers) as client, asyncio.TaskGroup() as tg:
tg.create_task(client_to_server(client, server))
tg.create_task(server_to_client(client, server))
except ExceptionGroup as e:
print(e)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(websockets.serve(proxy, "0.0.0.0", 8765))
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment