Skip to content

Instantly share code, notes, and snippets.

@Anton-2
Last active September 8, 2024 23:21
Show Gist options
  • Save Anton-2/2f2919d402a95823a22d378455ec9ab3 to your computer and use it in GitHub Desktop.
Save Anton-2/2f2919d402a95823a22d378455ec9ab3 to your computer and use it in GitHub Desktop.
from websockets.asyncio import client as wscli
from itertools import count
import asyncio
import orjson
class DBError(Exception):
code:int
message:str
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
return f"{self.message} (#{self.code})"
class LiveResult:
def __init__(self, uuid):
self.uuid = uuid
self.queue = asyncio.Queue()
def __aiter__(self):
return self
async def __anext__(self):
return await self.queue.get()
def put(self, value):
self.queue.put_nowait(value)
class Surreal:
def __init__(self, server="ws://localhost:8000"):
self.url = f"{server}/rpc"
self.cnt = count(1)
self.pending = {}
self.loop = None
self.ws = None
async def connect(self):
self.ws = await wscli.connect(self.url)
self.loop = asyncio.get_running_loop()
self.rtask = self.loop.create_task(self.recv())
def handle_live(self, uuid, result):
live = self.pending.get(uuid)
if not live:
print(f'No Live for {uuid}')
return
live.put(result)
async def recv(self):
while True:
msg = await self.ws.recv(decode=False)
data = orjson.loads(msg)
cmdid = data.get("id")
if cmdid is None:
result = data.get('result')
live_uuid = result.get("id") if result else None
if live_uuid is not None:
self.handle_live(live_uuid, result)
else:
print(f"recv bad msg {msg}")
continue
fut = self.pending.pop(cmdid, None)
if fut is None:
print(f'No Future for {msg}')
continue
if not fut.cancelled():
error = data.get("error")
if error:
code = error.get("code", 0)
msg = error.get("message", "?")
fut.set_exception(DBError(code, msg))
else:
result = data["result"]
if fut._live:
live_uuid = result[0]["result"]
live = LiveResult(live_uuid)
self.pending[live_uuid] = live
fut.set_result(live)
else:
fut.set_result(result)
async def cmd(self, method:str, *params, live=False):
cmdid = next(self.cnt)
fut = self.loop.create_future()
fut._live = live
self.pending[cmdid] = fut
msg = orjson.dumps(dict(id=cmdid, method=method, params=params))
await self.ws.send(str(msg, "utf-8"))
return await fut
async def query(self, query:str, **params):
response = await self.cmd('query', query, params)
assert len(response) == 1
return response[0].get('result')
async def live(self, query:str, **params):
return await self.cmd('query', f"LIVE {query}", params, live=True)
async def mquery(self, query:str, **params):
ret = await self.cmd('query', query, params)
return [r.get('result') for r in ret]
@Anton-2
Copy link
Author

Anton-2 commented Sep 8, 2024

exemple use :
image

concurrency test :
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment