Skip to content

Instantly share code, notes, and snippets.

@nabam
Last active December 11, 2019 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nabam/73717f709df182ccc59fae64e1668030 to your computer and use it in GitHub Desktop.
Save nabam/73717f709df182ccc59fae64e1668030 to your computer and use it in GitHub Desktop.
import http.server
import socketserver
import threading
import queue
import json
CALLBACK_SERVER_PORT = 5000
CALLBACK_TIMEOUT = 10 # seconds
callback_queues = {}
queues_lock = threading.Lock()
def got_callback(path, data):
queues_lock.acquire()
if path not in callback_queues.keys():
callback_queues[path] = queue.Queue(10)
queues_lock.release()
callback_queues[path].put(data, timeout=CALLBACK_TIMEOUT)
def wait_for_callback(path):
queues_lock.acquire()
if path not in callback_queues.keys():
callback_queues[path] = queue.Queue(10)
queues_lock.release()
try:
return callback_queues[path].get(timeout=CALLBACK_TIMEOUT)
except queue.Empty:
raise Exception(f"Timed out waiting for callback at {path}") from None
class CallbackServer(http.server.BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.protocol_version = "HTTP/1.1"
super(CallbackServer, self).__init__(*args, **kwargs)
def send_error(self, code, content):
self.send_response(code)
self.send_header('Content-Length', len(content))
self.send_header('Connection', 'close')
self.end_headers()
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-Length', 0)
self.send_header('Connection', 'close')
self.end_headers()
got_callback(self.path, post_data.decode('utf-8'))
def do_GET(self):
self.send_response(200)
self.send_header('Content-Length', 0)
self.send_header('Connection', 'close')
self.end_headers()
def _reduce_precision(data, ndigits=3):
queue = [data]
while(True):
c = queue.pop()
idxs = []
if isinstance(c, dict):
idxs = c.keys()
elif isinstance(c, list):
idxs = range(0, len(c))
for k in idxs:
if isinstance(c[k], dict) or isinstance(c[k], list):
queue.append(c[k])
if isinstance(c[k], float):
c[k] = round(c[k], ndigits)
if len(queue) == 0:
break
def get_callback(request, endpoint=None, expected=None):
if isinstance(expected, dict):
body = json.loads(wait_for_callback(endpoint))
_reduce_precision(body)
assert body == expected, f"Callback body {body} != {expected}"
elif isinstance(expected, list):
bodies = []
for e in expected:
body = json.loads(wait_for_callback(endpoint))
_reduce_precision(body)
bodies.append(body)
for e in expected:
assert e in bodies, f"Callback body {e} not found in {bodies}"
httpd = socketserver.ThreadingTCPServer(('', CALLBACK_SERVER_PORT), CallbackServer, bind_and_activate=False)
httpd.allow_reuse_address = True
httpd.server_bind()
httpd.server_activate()
callback_server_thread = threading.Thread(target=httpd.serve_forever)
callback_server_thread.daemon = True
callback_server_thread.start()
- name: trigger drilling feature detection
request:
url: "..."
method: POST
verify: false
json:
cam_feature_types: [0, 1, 2]
coordinate: &hole_coordinates [-19.345635117104933, -75.026257203182653, 53.57106023475771]
callback: "{callback_server}/hole_hafd"
response:
status_code: 200
verify_response_with:
function: callback_server:get_callback
extra_kwargs:
endpoint: "/hole_hafd"
expected:
- cam_feature_type: 0
job_id: "{job_id:s}"
successful: True
properties:
fpc_bottom_radiuses: []
fpc_closed_cavity_max_radius: 3.980
fpc_contour_radiuses: [4.0]
fpc_is_closed_cavity: True
fpc_total_depth: 12.370
- cam_feature_type: 1
job_id: "{job_id:s}"
successful: False
error_message: "No material to remove"
- cam_feature_type: 2
job_id: "{job_id:s}"
successful: True
properties:
fpd_depth: 10.0
fpd_hole_type: 1
fpd_position: [-22.985, -76.681, 60.0]
fpd_radius: 4.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment