Skip to content

Instantly share code, notes, and snippets.

@svagionitis
Last active January 11, 2021 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save svagionitis/fd0ce903eb678492adcbcc3e1b4f8422 to your computer and use it in GitHub Desktop.
Save svagionitis/fd0ce903eb678492adcbcc3e1b4f8422 to your computer and use it in GitHub Desktop.
A websocket proxy example in python
## https://github.com/github/gitignore/blob/master/Global/Vim.gitignore
# Swap
[._]*.s[a-v][a-z]
!*.svg # comment out if you don't need vector files
[._]*.sw[a-p]
[._]s[a-rt-v][a-z]
[._]ss[a-gi-z]
[._]sw[a-p]
# Session
Session.vim
Sessionx.vim
# Temporary
.netrwhist
*~
# Auto-generated tag files
tags
# Persistent undo
[._]*.un~
## https://github.com/github/gitignore/blob/master/Python.gitignore
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
log-level = debug
proxy-ws-server-host = localhost
proxy-ws-server-port = 9999
remote-ws-server-host = localhost
remote-ws-server-port = 9991
#!/usr/bin/env python
"""
A websocket proxy example
"""
import asyncio
import sys
import os
import logging
import websockets
import websockets.exceptions
import configargparse
LOGGER = logging.getLogger(__name__)
async def sendMessageFromRemoteServerToRemoteClient(proxy_connections):
"""
Send the message from the remote server to the remote client
proxy_connections: A dictionary data structure holding the websocket remote connections
"""
# Get the messages from the remote server...
async for message in proxy_connections["WS_REMOTE_SERVER"]:
LOGGER.info("Incoming message from server: %s", message)
# ...and send them to the remote client
await proxy_connections["WS_REMOTE_CLI"].send(message)
async def sendMessageFromRemoteClientToRemoteServer(proxy_connections):
"""
Send a message from the remote client to the remote server
proxy_connections: A dictionary data structure holding the websocket remote connections
"""
# Get the messages from the remote client...
async for message in proxy_connections["WS_REMOTE_CLI"]:
LOGGER.info("Incoming message from client: %s", message)
# ...and send them to the remote server
await proxy_connections["WS_REMOTE_SERVER"].send(message)
async def handler(ws_from_remote_client_to_proxy_server, path):
"""
The handler of the proxy websocket server.
This handler will create a proxy client connection to the remote server REMOTE_WS_SERVER_URI
ws_from_remote_client_to_proxy_server: The websocket connection from the remote client
path: The path of the URI of the remote client
"""
LOGGER.debug(
"ws_from_remote_client: %s remote connection: %s:%s path: %s",
ws_from_remote_client_to_proxy_server,
ws_from_remote_client_to_proxy_server.host,
ws_from_remote_client_to_proxy_server.port,
path,
)
try:
# Use of ping_timeout and ping_interval None in order not to terminate the proxy client to remote server connection
async with websockets.connect(
REMOTE_WS_SERVER_URI, ping_timeout=None, ping_interval=None
) as ws_from_proxy_client_to_remote_server:
LOGGER.debug(
"ws_to_remote_server: %s remote connection: %s:%s",
ws_from_proxy_client_to_remote_server,
ws_from_proxy_client_to_remote_server.host,
ws_from_proxy_client_to_remote_server.port,
)
proxy_conns = {
"WS_REMOTE_CLI": ws_from_remote_client_to_proxy_server,
"WS_REMOTE_SERVER": ws_from_proxy_client_to_remote_server,
}
INCOMING_CLIENTS.append(proxy_conns)
LOGGER.debug("INCOMING_CLIENTS: %s", INCOMING_CLIENTS)
await asyncio.gather(
sendMessageFromRemoteServerToRemoteClient(proxy_conns),
sendMessageFromRemoteClientToRemoteServer(proxy_conns),
)
except websockets.exceptions.ConnectionClosedError:
# When closing the remote client, the proxy client is not closing properly emmiting the ConnectionClosedError
INCOMING_CLIENTS.remove(proxy_conns)
LOGGER.debug("INCOMING_CLIENTS: %s", INCOMING_CLIENTS)
pass
def parseArgs(script_args, config_file="ws_proxy.conf"):
"""
Parse the configuration arguments
If no input arguments is given, it will read them from the configuration file ws_proxy.conf
script_args: Input arguments
config_file: The configuration file. Default value ws_proxy.conf
"""
def _get_log_level_names():
return [
logging.getLevelName(v).lower()
for v in sorted(
getattr(logging, "_levelToName", None) or logging._levelNames
)
if getattr(v, "real", 0)
]
LOG_LEVEL_NAMES = _get_log_level_names()
cwd = os.path.dirname(os.path.realpath(__file__))
default_config_files = [os.path.join(cdir, config_file) for cdir in (cwd, ".")]
conf = configargparse.ArgParser(default_config_files=default_config_files)
conf.add(
"-c",
"--my-config",
required=False,
is_config_file=True,
help="config file path",
)
conf.add(
"-l",
"--log-level",
required=True,
env_var="LOG_LEVEL",
choices=LOG_LEVEL_NAMES,
help="Set the logging level",
)
conf.add(
"--proxy-ws-server-host",
env_var="PROXY_WS_SERVER_HOST",
required=True,
type=str,
help="The proxy websocket server host",
)
conf.add(
"--proxy-ws-server-port",
env_var="PROXY_WS_SERVER_PORT",
required=True,
type=int,
help="The proxy websocket server port",
)
conf.add(
"--remote-ws-server-host",
env_var="REMOTE_WS_SERVER_HOST",
required=True,
type=str,
help="The remote websocket server host",
)
conf.add(
"--remote-ws-server-port",
env_var="REMOTE_WS_SERVER_PORT",
required=True,
type=int,
help="The remote websocket server port",
)
return conf.parse_args(script_args)
def setupLogging(level):
"""
Setup the logging levels for LOGGER
level: Logging level to set
"""
fmt = "%(asctime)s %(levelname)s: %(message)s [%(name)s:%(funcName)s:%(lineno)d] "
logging.basicConfig(level=logging.getLevelName(str(level).upper()), format=fmt)
LOGGER.info("Log level set to %s", level)
def main():
"""
Main function
"""
global REMOTE_WS_SERVER_URI
global INCOMING_CLIENTS
config = parseArgs(sys.argv[1:])
LOGGER.debug(config)
setupLogging(config.log_level)
# A list of the incoming clients
INCOMING_CLIENTS = list()
LOGGER.debug("INCOMING_CLIENTS: %s", INCOMING_CLIENTS)
REMOTE_WS_SERVER_URI = "ws://%s:%s" % (
config.remote_ws_server_host,
config.remote_ws_server_port,
)
proxy_ws_server = websockets.serve(
handler, config.proxy_ws_server_host, config.proxy_ws_server_port
)
asyncio.get_event_loop().run_until_complete(proxy_ws_server)
asyncio.get_event_loop().run_forever()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment