Skip to content

Instantly share code, notes, and snippets.

@rgov
Created January 6, 2023 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rgov/fc4478498ef1b4aeef1f24887a69e4b2 to your computer and use it in GitHub Desktop.
Save rgov/fc4478498ef1b4aeef1f24887a69e4b2 to your computer and use it in GitHub Desktop.
Example of commicating with the Sixclear JADE TCP Server plug-in
import json
import socket
import struct
def send_obj(s, obj):
# JSON-encoded object with a length prefix (32-bit, big endian, signed?)
encoded = json.dumps(obj).encode()
s.send(struct.pack('!l', len(encoded)) + encoded)
def recv_obj(s):
size, = struct.unpack('!l', s.recv(4))
return json.loads(s.recv(size))
def send_request(s, target, operation, request):
send_obj(s, {
'target': target,
'message': {
'operation': operation,
'data': request
}
})
reply = recv_obj(s)
# Error handling: If the error.status flag is True, then an error
# occurred. If this flag is False, but the error.code is non-zero, then a
# warning has been provided.
if reply.get('error', {}).get('status', True):
error = reply.get('error', {}).get('source', 'Unknown error')
raise Exception(error)
elif reply.get('error', {}).get('code', 0) != 0:
warning = reply.get('error', {}).get('source', 'Unknown warning')
print('Warning:', warning)
return reply.get('value')
def get_data(s, path):
return send_request(s, '__SERVER__', 'Get Data', { 'path': path })
# ---- DEMO -----------------------------------------------------------------
import datetime
import time
HOST = "10.89.7.6"
PORT = 6341 # TCP Server plug-in options.server.port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
try:
while True:
value = get_data(s, '__WORKER__.timestamp')
print(datetime.datetime.fromtimestamp(value))
time.sleep(1)
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment