Skip to content

Instantly share code, notes, and snippets.

@ghing
Created April 30, 2013 13:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghing/5488610 to your computer and use it in GitHub Desktop.
Save ghing/5488610 to your computer and use it in GitHub Desktop.
Send OpenNI skeleton data over a websocket. Uses gevent-websocket for the websockety things.
#!/usr/bin/env python
## The equivalent of:
## "Working with the Skeleton"
## in the OpenNI user guide.
"""
This shows how to identify when a new user is detected, look for a pose for
that user, calibrate the users when they are in the pose, and track them,
sending updates about joint position over a websocket.
"""
import os
import json
import gevent
from gevent.pywsgi import WSGIServer
import geventwebsocket
from openni import (Context, UserGenerator, SkeletonJoint,
CALIBRATION_STATUS_OK, SKEL_PROFILE_ALL)
# Pose to use to calibrate the user
pose_to_use = 'Psi'
ctx = Context()
ctx.init()
# Create the user generator
user = UserGenerator()
user.create(ctx)
# Obtain the skeleton & pose detection capabilities
skel_cap = user.skeleton_cap
pose_cap = user.pose_detection_cap
# A set of all connected websockets
connected_sockets = set()
def broadcast(msg):
"""Send a message to all connected websockets"""
disconnected = []
for ws in connected_sockets:
try:
ws.send(msg)
except geventwebsocket.WebSocketError, ex:
print "{0}: {1}".format(ex.__class__.__name__, ex)
disconnected.append(ws)
except Exception:
# Send failed, consider the given socket disconnected
# HACK: This will catch other exceptions as well. Is there
# a cleaner way to track this?
# It seems like socket.io sends a heartbeat and tracks this.
# If the current method of detecting disconnections doesn't work,
# we could try using gevent-socketio instead of gevent-websocket
disconnected.append(ws)
for ws in disconnected:
# Remove the disconnected sockets from the set
connected_sockets.discard(ws)
# Declare the callbacks
def new_user(src, id):
print "1/4 User {} detected. Looking for pose..." .format(id)
pose_cap.start_detection(pose_to_use, id)
# TODO: Eliminate pose detection if possible
def pose_detected(src, pose, id):
print "2/4 Detected pose {} on user {}. Requesting calibration..." .format(pose,id)
pose_cap.stop_detection(id)
skel_cap.request_calibration(id, True)
def calibration_start(src, id):
print "3/4 Calibration started for user {}." .format(id)
# TODO: broadcast when a new user is being tracked
def calibration_complete(src, id, status):
if status == CALIBRATION_STATUS_OK:
print "4/4 User {} calibrated successfully! Starting to track." .format(id)
skel_cap.start_tracking(id)
else:
print "ERR User {} failed to calibrate. Restarting process." .format(id)
new_user(user, id)
# TODO: broadcast when a user is lost
def lost_user(src, id):
print "--- User {} lost." .format(id)
# Register them
user.register_user_cb(new_user, lost_user)
pose_cap.register_pose_detected_cb(pose_detected)
skel_cap.register_c_start_cb(calibration_start)
skel_cap.register_c_complete_cb(calibration_complete)
# Set the profile
skel_cap.set_profile(SKEL_PROFILE_ALL)
# Start generating
ctx.start_generating_all()
print "0/4 Starting to detect users. Press Ctrl-C to exit."
def poll_openni():
while True:
# Update to next frame
ctx.wait_and_update_all()
# Extract joint positions of each tracked user
for id in user.users:
if skel_cap.is_tracking(id):
for joint_name, joint in SkeletonJoint.names.iteritems():
if skel_cap.is_joint_active(joint):
pos = skel_cap.get_joint_position(id, joint)
joint_event_msg = {
'user': id,
'joint': joint_name,
'x': pos.point[0],
'y': pos.point[1],
'z': pos.point[0]
}
broadcast(json.dumps(joint_event_msg))
gevent.sleep(0)
def ws_handler(environ, start_response):
websocket = environ.get("wsgi.websocket")
if websocket is None:
return http_handler(environ, start_response)
try:
connected_sockets.add(websocket)
while True:
gevent.sleep(0)
websocket.close()
except geventwebsocket.WebSocketError, ex:
print "{0}: {1}".format(ex.__class__.__name__, ex)
def http_handler(environ, start_response):
if environ["PATH_INFO"].strip("/") == "version":
start_response("200 OK", [])
return [agent]
else:
start_response("400 Bad Request", [])
return ["WebSocket connection is expected here."]
# TODO: Serve html file and js, maybe with Flask
# TODO: Make port an option
if __name__ == '__main__':
path = os.path.dirname(geventwebsocket.__file__)
agent = "gevent-websocket/%s" % (geventwebsocket.__version__)
print "Running %s from %s" % (agent, path)
gevent.spawn(poll_openni)
WSGIServer(("0.0.0.0", 8080), ws_handler, handler_class=geventwebsocket.WebSocketHandler).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment