Skip to content

Instantly share code, notes, and snippets.

@voith
Last active September 10, 2019 06:11
Show Gist options
  • Save voith/e906d7ae486a7f183ba2fdc648ce26be to your computer and use it in GitHub Desktop.
Save voith/e906d7ae486a7f183ba2fdc648ce26be to your computer and use it in GitHub Desktop.
import asyncio
import json
import websockets
from web3.providers import WebsocketProvider
class CustomWebsocketProvider(WebsocketProvider):
"""
The current design of web3.py's WebsocketProvider
is such that it uses a persistent websocket connection
which doesn't close until the program exits.
This is a quick hack to make the WebsocketProvider
close connections after every request.
Usage:
from web3 import Web3
w3 = Web3(CustomWebsocketProvider('<WS_URL>'))
"""
async def coro_make_request(self, request_data):
# Note that this open a new connection everytime.
# Closing of the connection will be taken care by the context manager.
async with websockets.connect(
self.endpoint_uri,
loop=WebsocketProvider._loop) as conn:
await asyncio.wait_for(
conn.send(request_data),
timeout=self.websocket_timeout
)
return json.loads(
await asyncio.wait_for(
conn.recv(),
timeout=self.websocket_timeout
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment