Skip to content

Instantly share code, notes, and snippets.

@filwaline
Last active November 25, 2020 09:41
Show Gist options
  • Save filwaline/37309797265079f2ad7fcc47999575b2 to your computer and use it in GitHub Desktop.
Save filwaline/37309797265079f2ad7fcc47999575b2 to your computer and use it in GitHub Desktop.
websockets client retry attempts
# Standard Library
import asyncio
import random
# Third Party Library
import websockets
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
class WebSocketRetry:
def __init__(self, uri, timeout=3):
"""
IF __INIT__ WAS CALLED DIRECTLY
THEN GENERATOR MUST SETUP MANUALLY
"""
self.client = self.get_websocket_connection(uri)
self.timeout = timeout
self._flag_closed = False
@property
def closed(self):
return self._flag_closed
@classmethod
async def create(cls, uri, timeout=3):
self = cls(uri, timeout=timeout)
await self.client.asend(None)
return self
async def get_websocket_connection(self, uri):
while not self.closed:
try:
print("Create New Connection...")
async with websockets.connect(uri) as ws:
self._websocket_connection = ws
data = yield
while True:
await ws.send(data)
data = yield await asyncio.wait_for(ws.recv(), self.timeout)
except (asyncio.TimeoutError, ConnectionClosedOK, ConnectionClosedError):
pass
async def send(self, data):
if self.closed:
await self._websocket_connection.ping()
if (r := await self.client.asend(data)) :
return r
else:
# timeout retry
return await self.client.asend(data)
async def close(self):
if not self.closed:
self._flag_closed = True
await self._websocket_connection.close()
print("Connection closed.")
class WebSocketRetryConnectionPool:
def __init__(self, uri, timeout=3, max_size=6) -> None:
self.queue: asyncio.Queue[WebSocketRetry] = asyncio.Queue(maxsize=max_size)
self.uri = uri
self.timeout = timeout
self.num_connection = 0
async def get_client(self):
try:
return await asyncio.wait_for(
self.queue.get(), 0.1 * (self.num_connection ** 2) + random.random()
) # prevent timeout concurrently
except asyncio.TimeoutError:
self.num_connection += 1
return await WebSocketRetry.create(self.uri, self.timeout)
async def put_client(self, item: WebSocketRetry):
try:
self.queue.put_nowait(item)
except asyncio.QueueFull:
await self.del_client(item)
async def del_client(self, item: WebSocketRetry):
await item.close()
del item
self.num_connection -= 1
async def send(self, data):
client = await self.get_client()
try:
result = await client.send(data)
except StopAsyncIteration:
await self.del_client(client)
return await self.send(data)
else:
await self.put_client(client)
return result
@filwaline
Copy link
Author

filwaline commented Oct 31, 2020

Usage

In []: client = await WebSocketRetry.create("ws://localhost/echo")
Create New Connection...

In []: await client.send("hello")
Out[]: 'ECHO: hello'

# if server was down or network error...
In []: await client.send("hi")
code = 1006 (connection closed abnormally [internal]), no reason
Create New Connection...

# if server can accept connection again ...
In []: await client.send("hi")
Out[]: 'ECHO: hi'

In []: await client.close()

In []: await client.send("hi?")
#  ConnectionClosedError will raise

@filwaline
Copy link
Author

filwaline commented Nov 9, 2020

Concurrently call send method will crash.

In []: client = await WebSocketRetry.create("ws://localhost/echo")
Create New Connection...

In []: await asyncio.gather(client.send("hi"), client.send("hello"))
# RuntimeError: anext(): asynchronous generator is already running

Maintain a connection pool will help.

In []: pool = WebSocketRetryConnectionPool('ws://localhost/echo', max_size=6)

In []: await asyncio.gather(*[pool.send(f"sleep:{random.random()}") for i in range(9)])
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
code = 1011 (unexpected error), no reason
Create New Connection...
Connection closed.
Connection closed.
Connection closed.
Out[]:
['ECHO: sleep:0.36816088704040484',
 'ECHO: sleep:0.261342905119351',
 'ECHO: sleep:0.021739125526603176',
 'ECHO: sleep:0.1854844380496361',
 'ECHO: sleep:0.3522426997801139',
 'ECHO: sleep:0.48079726067202344',
 'ECHO: sleep:0.9158652217304699',
 'ECHO: sleep:0.0916119572655818',
 'ECHO: sleep:0.4528050067088306']

In []: await asyncio.gather(*[pool.send(f"sleep:{random.random()}") for i in range(9)])
Create New Connection...
Create New Connection...
Create New Connection...
Connection closed.
Connection closed.
Connection closed.
Out[]:
['ECHO: sleep:0.524064282869432',
 'ECHO: sleep:0.9945002904152017',
 'ECHO: sleep:0.7692045239383397',
 'ECHO: sleep:0.4888025348355699',
 'ECHO: sleep:0.5231339368495661',
 'ECHO: sleep:0.5538406043012046',
 'ECHO: sleep:0.5632313309783386',
 'ECHO: sleep:0.5603640559759943',
 'ECHO: sleep:0.0008147336463586452']

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment