Skip to content

Instantly share code, notes, and snippets.

@pgrandinetti
Last active January 22, 2024 21:55
Show Gist options
  • Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
Automatic reconnect from websockets
import socket
import asyncio
import websockets
import time
import logging
import argparse
import threading
import sys
logger = logging.getLogger(__name__)
logging.basicConfig(
stream=sys.stdout,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class WSClient():
def __init__(self, url, **kwargs):
self.url = url
# set some default values
self.reply_timeout = kwargs.get('reply_timeout') or 10
self.ping_timeout = kwargs.get('ping_timeout') or 5
self.sleep_time = kwargs.get('sleep_time') or 5
self.callback = kwargs.get('callback')
async def listen_forever(self):
while True:
# outer loop restarted every time the connection fails
logger.debug('Creating new connection...')
try:
async with websockets.connect(self.url) as ws:
while True:
# listener loop
try:
reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
try:
pong = await ws.ping()
await asyncio.wait_for(pong, timeout=self.ping_timeout)
logger.debug('Ping OK, keeping connection alive...')
continue
except:
logger.debug(
'Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
break
logger.debug('Server said > {}'.format(reply))
if self.callback:
self.callback(reply)
except socket.gaierror:
logger.debug(
'Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
continue
except ConnectionRefusedError:
logger.debug('Nobody seems to listen to this endpoint. Please check the URL.')
logger.debug('Retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
continue
def start_ws_client(client):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(client.listen_forever())
def callback_fn(data, *args, **kwargs):
# Write here your logic
logger.debug('This is the callback speaking!') # ignore data
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--url',
required=False,
# set here your URL
default='ws://katiedj.com:8080/macro/sample/',
dest='url',
help='Websocket URL')
parser.add_argument('--reply-timeout',
required=False,
dest='reply_timeout',
type=int,
help='Timeout for reply from server')
parser.add_argument('--ping-timeout',
required=False,
dest='ping_timeout',
default=None,
help='Timeout when pinging the server')
parser.add_argument('--sleep',
required=False,
type=int,
dest='sleep_time',
default=None,
help='Sleep time before retrieving connection')
args = parser.parse_args()
ws_client = WSClient(**vars(args), callback=callback_fn)
start_ws_client(ws_client)
@coccoinomane
Copy link

Great, thanks! Have a nice day :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment