Last active
November 25, 2020 09:41
-
-
Save filwaline/37309797265079f2ad7fcc47999575b2 to your computer and use it in GitHub Desktop.
websockets client retry attempts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Standard Library | |
import asyncio | |
import random | |
# Third Party Library | |
import websockets | |
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK | |
class WebSocketRetry: | |
def __init__(self, uri, timeout=3): | |
""" | |
IF __INIT__ WAS CALLED DIRECTLY | |
THEN GENERATOR MUST SETUP MANUALLY | |
""" | |
self.client = self.get_websocket_connection(uri) | |
self.timeout = timeout | |
self._flag_closed = False | |
@property | |
def closed(self): | |
return self._flag_closed | |
@classmethod | |
async def create(cls, uri, timeout=3): | |
self = cls(uri, timeout=timeout) | |
await self.client.asend(None) | |
return self | |
async def get_websocket_connection(self, uri): | |
while not self.closed: | |
try: | |
print("Create New Connection...") | |
async with websockets.connect(uri) as ws: | |
self._websocket_connection = ws | |
data = yield | |
while True: | |
await ws.send(data) | |
data = yield await asyncio.wait_for(ws.recv(), self.timeout) | |
except (asyncio.TimeoutError, ConnectionClosedOK, ConnectionClosedError): | |
pass | |
async def send(self, data): | |
if self.closed: | |
await self._websocket_connection.ping() | |
if (r := await self.client.asend(data)) : | |
return r | |
else: | |
# timeout retry | |
return await self.client.asend(data) | |
async def close(self): | |
if not self.closed: | |
self._flag_closed = True | |
await self._websocket_connection.close() | |
print("Connection closed.") | |
class WebSocketRetryConnectionPool: | |
def __init__(self, uri, timeout=3, max_size=6) -> None: | |
self.queue: asyncio.Queue[WebSocketRetry] = asyncio.Queue(maxsize=max_size) | |
self.uri = uri | |
self.timeout = timeout | |
self.num_connection = 0 | |
async def get_client(self): | |
try: | |
return await asyncio.wait_for( | |
self.queue.get(), 0.1 * (self.num_connection ** 2) + random.random() | |
) # prevent timeout concurrently | |
except asyncio.TimeoutError: | |
self.num_connection += 1 | |
return await WebSocketRetry.create(self.uri, self.timeout) | |
async def put_client(self, item: WebSocketRetry): | |
try: | |
self.queue.put_nowait(item) | |
except asyncio.QueueFull: | |
await self.del_client(item) | |
async def del_client(self, item: WebSocketRetry): | |
await item.close() | |
del item | |
self.num_connection -= 1 | |
async def send(self, data): | |
client = await self.get_client() | |
try: | |
result = await client.send(data) | |
except StopAsyncIteration: | |
await self.del_client(client) | |
return await self.send(data) | |
else: | |
await self.put_client(client) | |
return result |
Concurrently call send
method will crash.
In []: client = await WebSocketRetry.create("ws://localhost/echo")
Create New Connection...
In []: await asyncio.gather(client.send("hi"), client.send("hello"))
# RuntimeError: anext(): asynchronous generator is already running
Maintain a connection pool will help.
In []: pool = WebSocketRetryConnectionPool('ws://localhost/echo', max_size=6)
In []: await asyncio.gather(*[pool.send(f"sleep:{random.random()}") for i in range(9)])
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
Create New Connection...
code = 1011 (unexpected error), no reason
Create New Connection...
Connection closed.
Connection closed.
Connection closed.
Out[]:
['ECHO: sleep:0.36816088704040484',
'ECHO: sleep:0.261342905119351',
'ECHO: sleep:0.021739125526603176',
'ECHO: sleep:0.1854844380496361',
'ECHO: sleep:0.3522426997801139',
'ECHO: sleep:0.48079726067202344',
'ECHO: sleep:0.9158652217304699',
'ECHO: sleep:0.0916119572655818',
'ECHO: sleep:0.4528050067088306']
In []: await asyncio.gather(*[pool.send(f"sleep:{random.random()}") for i in range(9)])
Create New Connection...
Create New Connection...
Create New Connection...
Connection closed.
Connection closed.
Connection closed.
Out[]:
['ECHO: sleep:0.524064282869432',
'ECHO: sleep:0.9945002904152017',
'ECHO: sleep:0.7692045239383397',
'ECHO: sleep:0.4888025348355699',
'ECHO: sleep:0.5231339368495661',
'ECHO: sleep:0.5538406043012046',
'ECHO: sleep:0.5632313309783386',
'ECHO: sleep:0.5603640559759943',
'ECHO: sleep:0.0008147336463586452']
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage