Created
December 31, 2020 01:09
-
-
Save randym32/16bde0ce2dda841336e3f9a250cca009 to your computer and use it in GitHub Desktop.
Anki Vector SDK Setup Helper utility
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
***Anki Vector SDK Setup*** | |
Vector requires all requests be authorized by an authenticated Anki user. | |
This script will enable this device to authenticate with your Vector | |
robot for use with a Vector Python SDK program. | |
Vector must be powered on and connected on the same network as your | |
computer. By running this script, you will be asked to provide your | |
Anki account credentials, and the script will download an authentication | |
token that will grant you access to the robot and his | |
capabilities (such as camera and audio) as well as data stored on the | |
robot (such as faces and photos). | |
Author: Mike Corlett | |
""" | |
import json | |
import requests | |
import sys | |
import base64 | |
import getpass | |
from datetime import datetime | |
import urllib3 | |
urllib3.disable_warnings() | |
# TODO Read config from sdk config file | |
robots = { | |
'v1': { 'ip' : '192.168.0.1', 'guid' : 'sfdkjadsA2898971a/mm==', 'name' : 'v1', | |
'cert' : '/Users/mike/.anki_vector/Vector-A1B2-00102346.cert' } | |
} | |
def url_for( r, method ): | |
return "https://{}/v1/{}".format(r['ip'],method) | |
class Robot(object): | |
def __init__( self, vec_config ): | |
self._cfg = vec_config | |
def name(self): | |
return self._cfg['name'] | |
# Make a request and return a pair (status_code,result_json) | |
# result_json is null if the call fails | |
def request( self, path, request_json ): | |
url = url_for( self._cfg, path ) | |
auth = 'Bearer ' + self._cfg['guid'] | |
headers = {"Authorization": auth, "Content-Type" : "application/json" } | |
try: | |
r = requests.post( url, json=request_json, headers=headers, stream=False, | |
# cert = self._cfg['cert'] ) | |
verify=False ) | |
except requests.exceptions.ConnectionError: | |
print( "Failed to connect" ) | |
return (500, None) | |
except requests.exceptions.Timeout: | |
print( "Timeout" ) | |
return (408, None) | |
except requests.exceptions.RequestException as e: | |
return (500, None) | |
if r.status_code == 200: | |
return (r.status_code, r.json()) | |
return (r.status_code, None) | |
# Make a request to a streaming URL and pass the streamed | |
# data to a dispatch function | |
def request_stream( self, path, request_json, dispatch ): | |
url = url_for( self._cfg, path ) | |
auth = 'Bearer ' + self._cfg['guid'] | |
headers = {"Authorization": auth, "Content-Type" : "application/json" } | |
try: | |
r = requests.post( url, json=request_json, headers=headers, verify=False, stream=True) | |
except requests.exceptions.ConnectionError: | |
print( "Failed to connect" ) | |
return (500, None) | |
except requests.exceptions.Timeout: | |
print( "Timeout" ) | |
return (408, None) | |
except requests.exceptions.RequestException as e: | |
print(e) | |
return (500, None) | |
for line in r.iter_lines(): | |
# filter out keep-alive new lines | |
if line: | |
decoded_line = line.decode('utf-8') | |
dispatch( self, decoded_line ) | |
def version( self ): | |
res = self.request( "version_state", {} ) | |
if res[1]: | |
return res[1]['os_version'] | |
print( res[0] ) | |
return None | |
def snapshot( self ): | |
date_fname = self._cfg['name'] + '-' + datetime.now().strftime("%Y-%m-%d-%H%M%S.jpg") | |
res = self.request( "capture_single_image", { "enable_high_resolution" : False} ) | |
j = res[1] | |
if j: | |
img_data = base64.b64decode(j['data']) | |
f = open( date_fname, "wb") | |
f.write( img_data ) | |
f.close() | |
def app_intent( self, intent, param ): | |
j = { "intent" : intent, "param" : param } | |
res = self.request('app_intent', j ) | |
if res[1]: | |
print( res[1]) | |
else: | |
print( res[0] ) | |
def enable_markers( self, enable ): | |
j = { 'enable': enable } | |
res = self.request('enable_marker_detection', j ) | |
if res[1]: | |
print( res[1]) | |
else: | |
print( res[0] ) | |
def sdk_guid( self, session_id ): | |
params = {"user_session_id": session_id } | |
self._cfg['guid'] = session_id | |
res = self.request( 'user_authentication', params ) | |
if res[1]: | |
guid = base64.b64decode(res[1]['client_token_guid']).decode( 'utf-8' ) | |
return guid | |
else: | |
print( res[0] ) | |
########################################################## | |
# Get session from anki server. Used to get an SDK GUID from the robot | |
anki_session_server ='https://accounts.api.anki.com/1/sessions' | |
def get_session_id(): | |
try: | |
print("Enter your email and password. Make sure to use the same account that was used to set up your Vector.") | |
username = input("Enter Email: ") | |
pw = getpass.getpass(prompt='Password:') | |
headers = { 'anki-app-key' : 'oDoa0quieSeir6goowai7f' } | |
req = {'username': username, 'password' : pw } | |
r = requests.post( anki_session_server, req, headers=headers ) | |
if r.status_code == 200: | |
j = r.json() | |
return j['session']['session_token'] | |
else: | |
print( r.status_code ) | |
except Exception as error: | |
print( error) | |
return None | |
def robot_factory( name ): | |
if name in robots: | |
return Robot( robots[name]) | |
return Robot({ 'ip' : name}) | |
return None | |
if __name__ == "__main__": | |
robit = robot_factory(sys.argv[1]) | |
if len(sys.argv) > 2: | |
cmd = sys.argv[2] | |
if cmd == 'snap': | |
robit.snapshot() | |
elif cmd == 'version': | |
print( robit.version() ) | |
elif cmd == 'intent': | |
robit.app_intent( sys.argv[3], "" ) | |
elif cmd == 'volume': | |
robit.app_intent( sys.argv[3], "" ) | |
elif cmd == 'markers': | |
robit.enable_markers( True ) | |
elif cmd == 'guid': | |
# Get a session ID | |
sess = get_session_id() | |
if sess: | |
# Success - send to the robot and get a guid in return | |
enc_sess = base64.b64encode( bytes(sess, 'utf-8')).decode('utf-8') | |
print(robit.sdk_guid( enc_sess )) | |
else: | |
print( "Failed to get session id") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment