Skip to content

Instantly share code, notes, and snippets.

@akvadrako
Last active April 28, 2022 16:52
Show Gist options
  • Save akvadrako/8ac132bfeeb5971d43772cc8b210404b to your computer and use it in GitHub Desktop.
Save akvadrako/8ac132bfeeb5971d43772cc8b210404b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
ACEBase Python Client
usage:
pip3 install websocket-client
./acebase-demo-client.py
"""
import http.client
import json
import re
from pathlib import Path
from logging import getLogger
ACEBASE_SERVER = 'localhost:920'
dbname = 'default'
client_id = 'py'
token = ''
conn = None
ws = None
log = getLogger('acebase')
def main():
"""Entrypoint."""
global token, conn
import sys
sys.path[0:0] = ['/home/dev/repo/pylib']
from lib.all import main_preinit
main_preinit()
from logging import basicConfig, DEBUG
basicConfig(level=DEBUG)
# read the secret token
with Path('~/.private/acebase.token').expanduser().open() as f:
token = f.read().strip()
# connect
conn = http.client.HTTPConnection(ACEBASE_SERVER, timeout=2)
#conn.set_debuglevel(1)
# basics()
watch()
def call(method, path, body=None):
"""Perform an HTTP request to the AceBase server."""
log.debug('%s %s %s', method, path, body)
conn.request(
method,
path,
body=json.dumps(body) if body else None,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}',
'AceBase-Context': json.dumps(dict(client=client_id)),
}
)
resp = conn.getresponse()
resp_body = resp.read()
log.debug('RESP BODY: %s', resp_body)
try:
resp_body = json.loads(resp_body)
except:
pass
if resp.status != 200:
log.error('ERROR RESP: %s %s %s', resp.status, resp.reason, resp_body)
if isinstance(resp_body, dict):
if ctx := resp.headers.get('AceBase-Context'):
resp_body['ctx'] = json.loads(ctx)
return resp_body
def walk(path, node):
"""Print all the child keys of node recursively."""
if isinstance(node, dict):
log.info("%s", path)
for key in node.keys():
walk(Path(path) / key, node[key])
def basics():
"""Perform basic calls."""
### AUTH
auth = call('POST', f'/auth/{dbname}/signin', dict(
client_id=client_id,
method='token',
access_token=token,
))
log.info('login uid:%s', auth["user"]["uid"])
### INFO
info = call('GET', f'/info/{dbname}')
log.info('info %s', info["version"])
### GET
#data = call('GET', f'/data/{dbname}/?exclude=*/*')
data = call('GET', f'/data/{dbname}/clients')
log.info('got ctx:%s', data['ctx'])
# walk("", data["val"])
cur = data['ctx']['acebase_cursor']
### REFLECT
ref = call('GET', f'/reflect/{dbname}/?type=info&child_limit=10')
log.info('reflect %s more:%s', ref['key'], ref['children']['more'])
for child in ref['children']['list']:
log.info(' child %s', child["key"])
### SET
from datetime import datetime
call('PUT', f'/data/{dbname}/clients/py', dict(
val=dict(
name=client_id,
mtime=datetime.now().isoformat('T', 'milliseconds')
),
))
### UPDATE
call('POST', f'/data/{dbname}/clients/py2', dict(
val=dict(
name=client_id,
mtime=datetime.now().isoformat('T', 'milliseconds')
),
))
### SYNC
from urllib.parse import quote
for_param = quote(json.dumps([
dict(path='', events=['child_changed', 'value', 'notify_value', 'mutations', 'mutated'])
]))
# cursor
changes = call('GET', f'/sync/changes/{dbname}?for={for_param}&cursor={cur}')
for chg in changes:
log.info('change %s %s = %s', chg['type'], chg['path'], chg['value'])
# timestamp
from time import time
stamp = int(time() - 24 * 3600 * 7) * 1000
changes = call('GET', f'/sync/changes/{dbname}?for={for_param}&timestamp={stamp}')
for chg in changes:
log.info('change %s %s = %s', chg['type'], chg['path'], chg['value'])
def sendws(method, *args):
msg = '42' + json.dumps([method, *args])
log.debug('send %s', msg)
ws.send(msg)
#ws._next += 1
class SockError(RuntimeError):
pass
def recvws():
data = ws.recv_data()
# OPCODE_CLOSE
if data[0] == 8:
raise SockError('closed')
# OPCODE_TEXT
if data[0] != 1:
raise SockError(f'unknown opcode: {data}')
body = data[1].decode()
# log.info('recv %s %s', data[0], body)
payload = list(re.match(r'(\d+)(.*)', body).groups())
if payload and payload[1]:
payload[1] = json.loads(payload[1])
return int(payload[0]), payload[1]
def watch():
"""Create a WS connection and monitor changes."""
global ws
from websocket import create_connection, enableTrace, WebSocketTimeoutException
import socket, ssl
# enableTrace(True)
ws = create_connection(f"ws://{ACEBASE_SERVER}/socket.io/?EIO=3&transport=websocket",
timeout=2,
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),),
sslopt={"cert_reqs": ssl.CERT_NONE},
)
ws._next = 1000
while True:
code, msg = recvws()
if code == 42 and msg[0] == 'welcome':
break
sendws('signin', token)
sendws('subscribe', dict(
path="",
event="mutations",
req_id="req_xxx",
#access_token=token,
))
log.info("Receiving...")
try:
while True:
try:
code, msg = recvws()
if code == 42:
if msg[0] == 'data-event':
log.info('event %s %s %s',
msg[1]["event"],
msg[1]["val"]["val"],
msg[1]["context"]["acebase_cursor"],
)
elif msg[0] == 'result':
log.info('result %s', msg[1])
else:
log.warning('recv %s %s', code, msg)
except WebSocketTimeoutException:
# Socket.IO ping
ws.send('3')
finally:
log.warning('closing')
ws.close()
if __name__ == '__main__':
main()
# body=None, headers={}, *, encode_chunked=False)
[flake8]
ignore = E226,E302,E41,E128,E127,E266,E704,E731,E126,W391,E402,E305,W293,W291,E303,F401,E401,E722,E265,E115
max-line-length = 140
max-complexity = 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment