Last active
March 27, 2018 19:43
-
-
Save boneyard93501/b4aaf612513fb07ed5c4664eab6b2dd2 to your computer and use it in GitHub Desktop.
threaded ws client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf8 -*- | |
import sys | |
import time | |
import json | |
# import signal | |
import threading | |
from collections import deque | |
import asyncio | |
try: | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
except: # noqa E722 | |
pass | |
import websockets | |
THREAD_COUNTER = [] | |
WS_COUNTER = [] | |
def threaded_ws_loop_handler(loop): | |
asyncio.set_event_loop(loop) | |
loop.run_forever() | |
def async_ws_thread_closer(t,loop): | |
try: | |
pass | |
# asyncio.set_event_loop(loop) | |
# loop.stop() | |
# pending = asyncio.Task.all_tasks(loop=loop) | |
# loop.run_until_complete(asyncio.gather(*pending)) # this isn't working. f'ing hell. | |
# asyncio.ensure_future((asyncio.gather(*pending))) # nope but this actually shouldn't | |
# loop.close() | |
except Exception as e: | |
# pass # log something ? | |
print(f'error: {e}') | |
finally: | |
# DEBUG | |
if (t,loop) in THREAD_COUNTER: | |
THREAD_COUNTER.remove((t,loop)) | |
def async_ws_thread_kicker(loop=None): | |
if loop is None: | |
loop = asyncio.new_event_loop() | |
t = threading.Thread(target=threaded_ws_loop_handler,args=(loop,)) | |
t.setDaemon(True) | |
t.start() | |
# DEBUG: | |
THREAD_COUNTER.append((t,loop)) | |
return t, loop | |
class WSHandler: | |
''' websocket(s) client request handler ''' | |
_loop = None | |
_loop_thread = None | |
# _thread_event = None # don't need if daemonized thread | |
_ws_registry = None | |
def __init__(self, uri): | |
self.uri = uri | |
if WSHandler._loop is None: | |
WSHandler._loop_thread, WSHandler._loop = async_ws_thread_kicker() | |
WSHandler._thread_event = threading.Event() | |
if WSHandler._ws_registry is None: | |
WSHandler._ws_registry = deque() | |
def __del__(self): | |
# unless the closer function is working, might as well leave it all to GC | |
if WSHandler._loop is not None: | |
async_ws_thread_closer(WSHandler._loop_thread, WSHandler._loop) | |
WSHandler._loop = None | |
WSHandler._loop_thread = None | |
async def ws_call(self, payload): | |
try: | |
async with websockets.connect(uri=self.uri, loop=WSHandler._loop) as ws: | |
await ws.send(payload) | |
res = await ws.recv() | |
return res | |
except Exception as e: | |
# TODO: be explcit. use InvalidHandshake, InvalidUri, Timeout | |
raise Exception('ws call failure for {}: {}'.format(payload, e)) | |
def make_request(self, payload): | |
# TODO: bottle up exc's properly | |
payload = json.dumps(payload) | |
res = asyncio.run_coroutine_threadsafe(self.ws_call(payload), WSHandler._loop) | |
return res.result() | |
# testing for parity only | |
def eye_ball_test(uri='ws://127.0.0.1:8546', n_ws=100): | |
def req_maker(tid, uri): | |
payloads = [{"jsonrpc": "2.0", "method": "eth_protocolVersion", "params": [], "id": 10}, | |
{"jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id": 20}, | |
{"jsonrpc": "2.0", "method": "net_version", "params": [], "id": 30} | |
] | |
ws = WSHandler(uri) | |
for payload in payloads: | |
result = ws.make_request(payload) | |
print(f'make_req id {tid}, result {result} for payload {payload}') | |
time.sleep(tid * 0.05) | |
# easiest to init the first instance rather than add _counter to class | |
ws = WSHandler(uri) | |
req_threads = [] | |
for i in range(n_ws): | |
t = threading.Thread(target=req_maker,args=(i + 1,uri)) | |
req_threads.append(t) | |
[t.start() for t in req_threads] | |
print(len(THREAD_COUNTER)) | |
time.sleep(2.0) | |
[t.join() for t in req_threads] | |
print(len(THREAD_COUNTER)) | |
del ws # out init instance | |
print(len(THREAD_COUNTER)) | |
if __name__ == '__main__': | |
eye_ball_test() | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment