Skip to content

Instantly share code, notes, and snippets.

@randym32
Created December 31, 2020 01:09
Show Gist options
  • Save randym32/16bde0ce2dda841336e3f9a250cca009 to your computer and use it in GitHub Desktop.
Save randym32/16bde0ce2dda841336e3f9a250cca009 to your computer and use it in GitHub Desktop.
Anki Vector SDK Setup Helper utility
#!/usr/bin/env python3
"""
***Anki Vector SDK Setup***
Vector requires all requests be authorized by an authenticated Anki user.
This script will enable this device to authenticate with your Vector
robot for use with a Vector Python SDK program.
Vector must be powered on and connected on the same network as your
computer. By running this script, you will be asked to provide your
Anki account credentials, and the script will download an authentication
token that will grant you access to the robot and his
capabilities (such as camera and audio) as well as data stored on the
robot (such as faces and photos).
Author: Mike Corlett
"""
import json
import requests
import sys
import base64
import getpass
from datetime import datetime
import urllib3
urllib3.disable_warnings()
# TODO Read config from sdk config file
robots = {
'v1': { 'ip' : '192.168.0.1', 'guid' : 'sfdkjadsA2898971a/mm==', 'name' : 'v1',
'cert' : '/Users/mike/.anki_vector/Vector-A1B2-00102346.cert' }
}
def url_for( r, method ):
return "https://{}/v1/{}".format(r['ip'],method)
class Robot(object):
def __init__( self, vec_config ):
self._cfg = vec_config
def name(self):
return self._cfg['name']
# Make a request and return a pair (status_code,result_json)
# result_json is null if the call fails
def request( self, path, request_json ):
url = url_for( self._cfg, path )
auth = 'Bearer ' + self._cfg['guid']
headers = {"Authorization": auth, "Content-Type" : "application/json" }
try:
r = requests.post( url, json=request_json, headers=headers, stream=False,
# cert = self._cfg['cert'] )
verify=False )
except requests.exceptions.ConnectionError:
print( "Failed to connect" )
return (500, None)
except requests.exceptions.Timeout:
print( "Timeout" )
return (408, None)
except requests.exceptions.RequestException as e:
return (500, None)
if r.status_code == 200:
return (r.status_code, r.json())
return (r.status_code, None)
# Make a request to a streaming URL and pass the streamed
# data to a dispatch function
def request_stream( self, path, request_json, dispatch ):
url = url_for( self._cfg, path )
auth = 'Bearer ' + self._cfg['guid']
headers = {"Authorization": auth, "Content-Type" : "application/json" }
try:
r = requests.post( url, json=request_json, headers=headers, verify=False, stream=True)
except requests.exceptions.ConnectionError:
print( "Failed to connect" )
return (500, None)
except requests.exceptions.Timeout:
print( "Timeout" )
return (408, None)
except requests.exceptions.RequestException as e:
print(e)
return (500, None)
for line in r.iter_lines():
# filter out keep-alive new lines
if line:
decoded_line = line.decode('utf-8')
dispatch( self, decoded_line )
def version( self ):
res = self.request( "version_state", {} )
if res[1]:
return res[1]['os_version']
print( res[0] )
return None
def snapshot( self ):
date_fname = self._cfg['name'] + '-' + datetime.now().strftime("%Y-%m-%d-%H%M%S.jpg")
res = self.request( "capture_single_image", { "enable_high_resolution" : False} )
j = res[1]
if j:
img_data = base64.b64decode(j['data'])
f = open( date_fname, "wb")
f.write( img_data )
f.close()
def app_intent( self, intent, param ):
j = { "intent" : intent, "param" : param }
res = self.request('app_intent', j )
if res[1]:
print( res[1])
else:
print( res[0] )
def enable_markers( self, enable ):
j = { 'enable': enable }
res = self.request('enable_marker_detection', j )
if res[1]:
print( res[1])
else:
print( res[0] )
def sdk_guid( self, session_id ):
params = {"user_session_id": session_id }
self._cfg['guid'] = session_id
res = self.request( 'user_authentication', params )
if res[1]:
guid = base64.b64decode(res[1]['client_token_guid']).decode( 'utf-8' )
return guid
else:
print( res[0] )
##########################################################
# Get session from anki server. Used to get an SDK GUID from the robot
anki_session_server ='https://accounts.api.anki.com/1/sessions'
def get_session_id():
try:
print("Enter your email and password. Make sure to use the same account that was used to set up your Vector.")
username = input("Enter Email: ")
pw = getpass.getpass(prompt='Password:')
headers = { 'anki-app-key' : 'oDoa0quieSeir6goowai7f' }
req = {'username': username, 'password' : pw }
r = requests.post( anki_session_server, req, headers=headers )
if r.status_code == 200:
j = r.json()
return j['session']['session_token']
else:
print( r.status_code )
except Exception as error:
print( error)
return None
def robot_factory( name ):
if name in robots:
return Robot( robots[name])
return Robot({ 'ip' : name})
return None
if __name__ == "__main__":
robit = robot_factory(sys.argv[1])
if len(sys.argv) > 2:
cmd = sys.argv[2]
if cmd == 'snap':
robit.snapshot()
elif cmd == 'version':
print( robit.version() )
elif cmd == 'intent':
robit.app_intent( sys.argv[3], "" )
elif cmd == 'volume':
robit.app_intent( sys.argv[3], "" )
elif cmd == 'markers':
robit.enable_markers( True )
elif cmd == 'guid':
# Get a session ID
sess = get_session_id()
if sess:
# Success - send to the robot and get a guid in return
enc_sess = base64.b64encode( bytes(sess, 'utf-8')).decode('utf-8')
print(robit.sdk_guid( enc_sess ))
else:
print( "Failed to get session id")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment