Skip to content

Instantly share code, notes, and snippets.

@pgrandinetti
Last active January 22, 2024 21:55
Show Gist options
  • Star 22 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
Automatic reconnect from websockets
import socket
import asyncio
import websockets
import time
import logging
import argparse
import threading
import sys
logger = logging.getLogger(__name__)
logging.basicConfig(
stream=sys.stdout,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class WSClient():
def __init__(self, url, **kwargs):
self.url = url
# set some default values
self.reply_timeout = kwargs.get('reply_timeout') or 10
self.ping_timeout = kwargs.get('ping_timeout') or 5
self.sleep_time = kwargs.get('sleep_time') or 5
self.callback = kwargs.get('callback')
async def listen_forever(self):
while True:
# outer loop restarted every time the connection fails
logger.debug('Creating new connection...')
try:
async with websockets.connect(self.url) as ws:
while True:
# listener loop
try:
reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
try:
pong = await ws.ping()
await asyncio.wait_for(pong, timeout=self.ping_timeout)
logger.debug('Ping OK, keeping connection alive...')
continue
except:
logger.debug(
'Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
break
logger.debug('Server said > {}'.format(reply))
if self.callback:
self.callback(reply)
except socket.gaierror:
logger.debug(
'Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
continue
except ConnectionRefusedError:
logger.debug('Nobody seems to listen to this endpoint. Please check the URL.')
logger.debug('Retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
await asyncio.sleep(self.sleep_time)
continue
def start_ws_client(client):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(client.listen_forever())
def callback_fn(data, *args, **kwargs):
# Write here your logic
logger.debug('This is the callback speaking!') # ignore data
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--url',
required=False,
# set here your URL
default='ws://katiedj.com:8080/macro/sample/',
dest='url',
help='Websocket URL')
parser.add_argument('--reply-timeout',
required=False,
dest='reply_timeout',
type=int,
help='Timeout for reply from server')
parser.add_argument('--ping-timeout',
required=False,
dest='ping_timeout',
default=None,
help='Timeout when pinging the server')
parser.add_argument('--sleep',
required=False,
type=int,
dest='sleep_time',
default=None,
help='Sleep time before retrieving connection')
args = parser.parse_args()
ws_client = WSClient(**vars(args), callback=callback_fn)
start_ws_client(ws_client)
@pgrandinetti
Copy link
Author

pgrandinetti commented Dec 7, 2019

Based on my project https://github.com/pgrandinetti/katiedj-listeners and on several requests received after my comment here python-websockets/websockets#414

Run it with python automatic_websocket_reconnect.py

Tested with Python 3.7

Output of pip freeze

certifi==2019.11.28
chardet==3.0.4
idna==2.8
requests==2.22.0
urllib3==1.25.7
websockets==8.1

@mjcumming
Copy link

Thank you for the example - quick question not related to reconnect but to the callback. Should this not be done using the call_soon, call_soon_threadsafe or run_in_executer rather than directly calling it?

@pgrandinetti
Copy link
Author

@mjcumming
Good point. To do that, you have to make some change because call_soon is a function of the event loop object.

I (very) quickly tested this approach:

  • Create a method in the WSClient object def set_loop(self, loop): self.loop=loop;
  • Call that method between lines 69-70: client.set_loop(loop)
  • Replace line 54 with self.loop.call_soon(self.callback, reply)

It seems to work fine.

That said, I believe whether this is a good approach or not depends on your application. Here's a quote from a SO answer:

Callbacks are also not expected to return anything; they are fire-and-forget routines, trusted to not lock up the whole system by running anything heavy or blocking. call_soon() returns a Handle() instance that only lets you cancel it again (a no-op if it already has been executed). The callbacks are executed next time the event loop checks the callback queue, at which point they (hopefully briefly) block any other work from being done*.

@chenkaiC4
Copy link

chenkaiC4 commented Jun 29, 2020

thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?

@pgrandinetti
Copy link
Author

thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?

@chenkaiC4 This is a listener (client), not a producer (server). You can use this code to listen to data that are streamed by some server.
In this example, I listen to the data streamed by the server at http://katiedj.com (which I built as well). The code to build a server like katiedj.com is open-source https://github.com/pgrandinetti/katiedj
Feel free to open an issue in that project, if you need help creating your own server.

@silegon
Copy link

silegon commented Dec 21, 2020

thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?

I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.

@pgrandinetti
Copy link
Author

pgrandinetti commented Dec 22, 2020

thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?

I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.

@silegon Can you suggest a way to make this clearer?

@coccoinomane
Copy link

Thanks mate for this snippet!
It seems that the feature has been implemented in websockets using async for:

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

Would you say that you snippet is still useful compared to the official solution?

Thanks,
Cocco

@pgrandinetti
Copy link
Author

@coccoinomane It appears that async for in websockets was implemented as a response to the following, very old ticket that I opened python-websockets/websockets#414 - and therefore I expect it do implement the same logic of this gist, or maybe a better one!
Short answer: Use async for as provided in websockets.

@coccoinomane
Copy link

Great, thanks! Have a nice day :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment