Created
December 12, 2011 20:18
-
-
Save swill/1468908 to your computer and use it in GitHub Desktop.
Two Cloudstack authentication methods: using api keys & using user credentials
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import urllib2 | |
import hmac | |
import hashlib | |
import base64 | |
import json | |
import pprint | |
class CloudstackAPI(object): | |
""" | |
Login and run queries against the Cloudstack API. | |
Example Usage: | |
cs_api = CloudstackAPI(username='username', password='password')) | |
accounts = cs_api.request(dict({'command':'listAccounts'})) | |
""" | |
def __init__(self, protocol='http', host='127.0.0.1:8080', uri='/client/api', username=None, password=None, domain=None, logging=True): | |
self.protocol = protocol | |
self.host = host | |
self.uri = uri | |
self.username = username | |
self.password = hashlib.md5(password).hexdigest() | |
self.sessionkey = None | |
self.errors = [] | |
self.logging = logging | |
# setup cookie handling | |
self.caller = urllib2.build_opener(urllib2.HTTPCookieProcessor()) | |
urllib2.install_opener(self.caller) | |
# login so you can get a sessionkey (and a cookie) | |
login_query = dict() | |
login_query['command'] = 'login' | |
login_query['username'] = self.username | |
login_query['password'] = self.password | |
if domain: | |
login_query['domain'] = domain | |
login_result = self.request(login_query) | |
if login_result: | |
self.sessionkey = login_result['sessionkey'] | |
else: | |
self.errors.append("Login failed...") | |
print self.errors | |
def request(self, params): | |
"""Builds a query from params and return a json object of the result or None""" | |
if self.sessionkey or (self.username and self.password and params['command'] == 'login'): | |
# add the default and dynamic params | |
params['response'] = 'json' | |
if self.sessionkey: | |
params['sessionkey'] = self.sessionkey | |
# build the query string | |
query_params = map(lambda (k,v):k+"="+urllib.quote(str(v)), params.items()) | |
query_string = "&".join(query_params) | |
# final query string... | |
url = self.protocol+"://"+self.host+self.uri+"?"+query_string | |
output = None | |
try: | |
output = json.loads(self.caller.open(url).read()) | |
except urllib2.HTTPError, e: | |
self.errors.append("HTTPError: "+str(e.code)) | |
except urllib2.URLError, e: | |
self.errors.append("URLError: "+str(e.reason)) | |
if output: | |
output = output[(params['command']).lower()+'response'] | |
if self.logging: | |
with open('request.log', 'a') as f: | |
f.write('request:\n') | |
f.write(url) | |
f.write('\n\n') | |
f.write('response:\n') | |
if output: | |
pprint.pprint(output, f, 2) | |
else: | |
f.write(repr(self.errors)) | |
f.write('\n\n\n\n') | |
return output | |
else: | |
self.errors.append("missing credentials in the constructor") | |
return None | |
if __name__ == "__main__": | |
host = '127.0.0.1:8080' | |
username = 'test_a_user' | |
password = 'password' | |
domain = 'cloudops' | |
cs_api = CloudstackAPI(host=host, username=username, password=password, domain=domain) | |
cs_api.request(dict({'command':'listAccounts'})) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import urllib2 | |
import hmac | |
import hashlib | |
import base64 | |
import json | |
import pprint | |
class CloudstackAPI(object): | |
""" | |
Login and run queries against the Cloudstack API. | |
Example Usage: | |
cs_api = CloudstackAPI(api_key='api_key', secret_key='secret_key')) | |
accounts = cs_api.request(dict({'command':'listAccounts'})) | |
""" | |
def __init__(self, protocol='http', host='127.0.0.1:8080', uri='/client/api', api_key=None, secret_key=None, logging=True): | |
self.protocol = protocol | |
self.host = host | |
self.uri = uri | |
self.api_key = api_key | |
self.secret_key = secret_key | |
self.errors = [] | |
self.logging = logging | |
def request(self, params): | |
"""Builds a query from params and return a json object of the result or None""" | |
if self.api_key and self.secret_key: | |
# add the default and dynamic params | |
params['response'] = 'json' | |
params['apiKey'] = self.api_key | |
# build the query string | |
query_params = map(lambda (k,v):k+"="+urllib.quote(str(v)), params.items()) | |
query_string = "&".join(query_params) | |
# build signature | |
query_params.sort() | |
signature_string = "&".join(query_params).lower() | |
signature = urllib.quote(base64.b64encode(hmac.new(self.secret_key, signature_string, hashlib.sha1).digest())) | |
# final query string... | |
url = self.protocol+"://"+self.host+self.uri+"?"+query_string+"&signature="+signature | |
output = None | |
try: | |
output = json.loads(urllib2.urlopen(url).read()) | |
except urllib2.HTTPError, e: | |
self.errors.append("HTTPError: "+str(e.code)) | |
except urllib2.URLError, e: | |
self.errors.append("URLError: "+str(e.reason)) | |
if output: | |
output = output[(params['command']).lower()+'response'] | |
if self.logging: | |
with open('request.log', 'a') as f: | |
f.write('request:\n') | |
f.write(url) | |
f.write('\n\n') | |
f.write('response:\n') | |
if output: | |
pprint.pprint(output, f, 2) | |
else: | |
f.write(repr(self.errors)) | |
f.write('\n\n\n\n') | |
return output | |
else: | |
self.errors.append("missing api_key and secret_key in the constructor") | |
return None | |
if __name__ == "__main__": | |
host = '127.0.0.1:8080' | |
api_key = 'KdbNqrP1LpeGyZtRZ8dQm-MfyQJH030eAlVYFCADKZTTV_ZdeK9TPp7gZ7i9R5Xb6G1SjhTmlk29Mj6ajlPOWA' | |
secret_key = 'rXtRGO1VYDUnKa1iE_k1c4s6VidRnDV7yYLi-Q_0GrrlHNih7vkq5Zno-9u0Ck41K-3syrioKFYoMvObBhPDkQ' | |
cs_api = CloudstackAPI(host=host, api_key=api_key, secret_key=secret_key) | |
cs_api.request(dict({'command':'listAccounts'})) | |
cs_api.request(dict({'command':'listUsers'})) |
I want to mention that i use Python 3 .
Thanks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi , i try to execute those codes but i have a problem with this line :
query_params = map(lambda (k,v):k+"="+urllib.quote(str(v)), params.items())
as error i have : SyntaxError: invalid syntax