Skip to content

Instantly share code, notes, and snippets.

@boneyard93501
Last active March 27, 2018 19:43
Show Gist options
  • Save boneyard93501/b4aaf612513fb07ed5c4664eab6b2dd2 to your computer and use it in GitHub Desktop.
Save boneyard93501/b4aaf612513fb07ed5c4664eab6b2dd2 to your computer and use it in GitHub Desktop.
threaded ws client
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sys
import time
import json
# import signal
import threading
from collections import deque
import asyncio
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except: # noqa E722
pass
import websockets
THREAD_COUNTER = []
WS_COUNTER = []
def threaded_ws_loop_handler(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def async_ws_thread_closer(t,loop):
try:
pass
# asyncio.set_event_loop(loop)
# loop.stop()
# pending = asyncio.Task.all_tasks(loop=loop)
# loop.run_until_complete(asyncio.gather(*pending)) # this isn't working. f'ing hell.
# asyncio.ensure_future((asyncio.gather(*pending))) # nope but this actually shouldn't
# loop.close()
except Exception as e:
# pass # log something ?
print(f'error: {e}')
finally:
# DEBUG
if (t,loop) in THREAD_COUNTER:
THREAD_COUNTER.remove((t,loop))
def async_ws_thread_kicker(loop=None):
if loop is None:
loop = asyncio.new_event_loop()
t = threading.Thread(target=threaded_ws_loop_handler,args=(loop,))
t.setDaemon(True)
t.start()
# DEBUG:
THREAD_COUNTER.append((t,loop))
return t, loop
class WSHandler:
''' websocket(s) client request handler '''
_loop = None
_loop_thread = None
# _thread_event = None # don't need if daemonized thread
_ws_registry = None
def __init__(self, uri):
self.uri = uri
if WSHandler._loop is None:
WSHandler._loop_thread, WSHandler._loop = async_ws_thread_kicker()
WSHandler._thread_event = threading.Event()
if WSHandler._ws_registry is None:
WSHandler._ws_registry = deque()
def __del__(self):
# unless the closer function is working, might as well leave it all to GC
if WSHandler._loop is not None:
async_ws_thread_closer(WSHandler._loop_thread, WSHandler._loop)
WSHandler._loop = None
WSHandler._loop_thread = None
async def ws_call(self, payload):
try:
async with websockets.connect(uri=self.uri, loop=WSHandler._loop) as ws:
await ws.send(payload)
res = await ws.recv()
return res
except Exception as e:
# TODO: be explcit. use InvalidHandshake, InvalidUri, Timeout
raise Exception('ws call failure for {}: {}'.format(payload, e))
def make_request(self, payload):
# TODO: bottle up exc's properly
payload = json.dumps(payload)
res = asyncio.run_coroutine_threadsafe(self.ws_call(payload), WSHandler._loop)
return res.result()
# testing for parity only
def eye_ball_test(uri='ws://127.0.0.1:8546', n_ws=100):
def req_maker(tid, uri):
payloads = [{"jsonrpc": "2.0", "method": "eth_protocolVersion", "params": [], "id": 10},
{"jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id": 20},
{"jsonrpc": "2.0", "method": "net_version", "params": [], "id": 30}
]
ws = WSHandler(uri)
for payload in payloads:
result = ws.make_request(payload)
print(f'make_req id {tid}, result {result} for payload {payload}')
time.sleep(tid * 0.05)
# easiest to init the first instance rather than add _counter to class
ws = WSHandler(uri)
req_threads = []
for i in range(n_ws):
t = threading.Thread(target=req_maker,args=(i + 1,uri))
req_threads.append(t)
[t.start() for t in req_threads]
print(len(THREAD_COUNTER))
time.sleep(2.0)
[t.join() for t in req_threads]
print(len(THREAD_COUNTER))
del ws # out init instance
print(len(THREAD_COUNTER))
if __name__ == '__main__':
eye_ball_test()
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment