Skip to content

Instantly share code, notes, and snippets.

@pvazteixeira
Last active August 10, 2018 03:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pvazteixeira/42d2cdac5c0e8170259e1622078d44d4 to your computer and use it in GitHub Desktop.
Save pvazteixeira/42d2cdac5c0e8170259e1622078d44d4 to your computer and use it in GitHub Desktop.
synchronySDK_py server
import time
import zmq
import json
from Navi import *
from NaviUtils import *
# TODO: configurable port
# TODO: configurable id
# TODO: configurable sessionid
# robotId = 'FeitorBot'
# sessionId = 'FeitorBotTestSession001'
navi = Navi('/home/pvt/Documents/synchronyConfig.json')
print 'navi server status:',navi.printStatus()
time.sleep(2)
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
robot = []
robot_id = []
sesion = []
session_id = []
while True:
# Wait for next request from client
message = socket.recv()
req = json.loads(message)
# process the request
if req['type'] == "AddOdometry2D":
delta = np.asarray(req['measurement'])
p = np.asarray(req['covariance'])
addNodeFactor_OdoNormal(navi, time.time(), req['robot_id'], req['session_id'],delta, p)
socket.send(b"OK")
elif req['type'] == "RegisterRobot":
robot_id = req['robot_id']
robot = registerRobot(navi,req['robot_id'])
socket.send(b"OK")
elif req['type'] == "RegisterSession":
session_id = req['session_id']
session = registerSession(navi, robot_id, req['session_id'])
socket.send(b"OK")
else:
print 'unsupported request type',req['type']
socket.send(b"KO")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment