Skip to content

Instantly share code, notes, and snippets.

@dusual
Created September 20, 2011 14:07
Show Gist options
  • Save dusual/1229159 to your computer and use it in GitHub Desktop.
Save dusual/1229159 to your computer and use it in GitHub Desktop.
The main file of my new Siesta fork that uses httplib2 and handles different auth methods
# Python Siesta
#
# Copyright (c) 2008 Rafael Xavier de Souza
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Siesta is a REST client for python
"""
#__all__ = ["API", "Resource"]
__version__ = "0.5.1"
__author__ = "Sebastian Castillo <castillobuiles@gmail.com>"
__contributors__ = []
import re
import time
import urllib
import httplib2
import logging
import simplejson as json
import httplib
import base64
from urlparse import urlparse,urljoin,urlsplit
from auth import OAuthExtended
import oauth2 as oauth
logging.basicConfig(level=0)
class AuthError(AttributeError):
""" Raised when any error related to auth is there.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class Resource(object):
# TODO: some attrs could be on a inner meta class
# so Resource can have a minimalist namespace population
# and minimize collitions with resource attributes
def __init__(self, uri ,api,):
#logging.info("init.uri: %s" % uri)
self.api = api
self.uri = uri
self.scheme, self.host, self.url, z1, z2 = urlsplit(self.api.base_url,self.uri)
self.id = None
self.conn = None
self.headers = {'User-Agent': self.api.user_agent}
self.attrs = {}
self._errors = {}
def __getattr__(self, name):
"""
Resource attributes (eg: user.name) have priority
over inner rerouces (eg: users(id=123).applications)
"""
#logging.info("getattr.name: %s" % name)
# Reource attrs like: user.name
if name in self.attrs:
return self.attrs.get(name)
#logging.info("self.url: %s" % self.url)
# TODO Inner resoruces for stuff like: GET /users/{id}/applications
key = self.uri + '/' + name
self.api.resources[key] = Resource(uri = key,
api=self.api)
return self.api.resources[key]
def __call__(self, id=None):
#logging.info("call.id: %s" % id)
#logging.info("call.self.url: %s" % self.url)
if id == None:
return self
self.id = str(id)
key = self.uri + '/' + self.id
self.api.resources[key] = Resource(uri=key,
api=self.api)
return self.api.resources[key]
# Set the "Accept" request header.
# +info about request headers:
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html
def set_request_type(self, mime):
if mime.lower() == 'json':
mime = 'application/json'
elif mime.lower() == 'xml':
mime = 'application/xml'
self.headers['Accept'] = mime
# GET /resource
# GET /resource/id?arg1=value1&...
def get(self, **kwargs):
if self.id == None:
url = self.url
else:
url = self.url + '/' + str(self.id)
if len(kwargs) > 0:
url = "%s?%s" % (url, urllib.urlencode(kwargs))
return self._request("GET", url)
# POST /resource
def post(self, **kwargs):
data = kwargs
meta = dict([(k, data.pop(k)) for k in data.keys() if k.startswith("__")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
return self._request("POST", self.url, data, headers, meta)
# PUT /resource/id
def put(self, **kwargs):
if not self.id:
return
url = self.url + '/' + str(self.id)
data = kwargs
meta = dict([(k, data.pop(k)) for k in data.keys() if k.startswith("__")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
return self._request("PUT", url, data, headers, meta)
# DELETE /resource/id
def delete(self, id, **kwargs):
url = self.url + '/' + str(id)
data = kwargs
meta = dict([(k, data.pop(k)) for k in data.keys() if k.startswith("__")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
return self._request(url, "DELETE", data, headers, meta)
def _request(self, method, url, body='', headers={}, meta={}):
# if self.api.auth:
# headers.update(self.api.auth.make_headers())
if str(self.api.auth_type) == 'None':
self.conn = httplib2.Http(cache='.cache')
if str(self.api.auth_type) == 'Basic':
self.conn = httplib2.Http(cache='.cache')
auth = base64.encodestring( self.api.username + ':' + self.api.password )
headers = { 'Authorization' : 'Basic ' + auth }
if str(self.api.auth_type) == 'Digest':
self.conn = httplib2.Http(cache='.cache')
self.conn.add_credentials(self.api.username, self.api.password)
if str(self.api.auth_type) == 'oauth':
if self.api.consumer_key is not None and self.api.consumer_token is not None:
consumer = oauth.Consumer(self.api.consumer_key, self.api.consumer_token)
self.conn = OAuthExtended(consumer=consumer, token=None,cache='.cache',
request_token_url = self.api.request_token_url,
access_token_url = self.api.access_token_url,
authorization_url = self.api.authorization_url,
callback_url = self.api.callback_url,
)
if self.api.oauth_token is not None and self.api.oauth_token_secret is not None:
token = oauth.Token(self.api.oauth_token, self.api.oauth_token_secret)
self.conn = OAuthExtended(consumer=consumer, token=token,cache='.cache',
request_token_url = self.api.request_token_url,
access_token_url = self.api.access_token_url,
authorization_url = self.api.authorization_url,
callback_url = self.api.callback_url,
)
else:
request_tokens,pincode = self.conn.get_authentication_tokens()
token = oauth.Token(request_tokens['oauth_token'], request_tokens['oauth_token_secret'])
token.set_verifier(pincode)
self.conn = OAuthExtended(consumer=consumer, token=token,cache='.cache',
request_token_url = self.api.request_token_url,
access_token_url = self.api.access_token_url,
authorization_url = self.api.authorization_url,
callback_url = self.api.callback_url,
)
request_tokens = self.conn.get_authorized_tokens()
token = oauth.Token(request_tokens['oauth_token'], request_tokens['oauth_token_secret'])
self.conn = OAuthExtended(consumer=consumer, token=token,cache='.cache',
request_token_url = self.api.request_token_url,
access_token_url = self.api.access_token_url,
authorization_url = self.api.authorization_url,
callback_url = self.api.callback_url,
)
self.api.oauth_token = request_tokens['oauth_token']
self.api.oauth_secret = request_tokens['oauth_token_secret']
if not 'User-Agent' in headers:
headers['User-Agent'] = self.headers['User-Agent']
if not 'Accept' in headers and 'Accept' in self.headers:
headers['Accept'] = self.headers['Accept']
complete_url = urljoin(self.scheme+'://'+self.host,self.uri+url)
#logging.info(">>>>>>>>>>>>>>>>>>>method: %s" % method)
#logging.info(">>>>>>>>>>>>>>>>>>>url: %s" % url)
#logging.info(">>>>>>>>>>>>>>>>>>>headers: %s" % headers)
#logging.info(">>>>>>>>>>>>>>>>>>>body: %s" % body)
request,content = self.conn.request(complete_url, method, body, headers)
return content
class API(object):
def __init__(self, base_url, user_agent = "Python-siesta/%s" % __version__ ,
auth=None,auth_type=None,username = None,password = None,
consumer_key = None,consumer_token = None,
):
self.base_url = base_url + '/' if not base_url.endswith('/') else base_url
self.api_path = urlparse(base_url).path
self.resources = {}
self.request_type = None
self.auth_type = auth_type
self.user_agent = user_agent
self.username = username
self.password = password
self.consumer_key = "AQx9zsGz6C9mgWk9JBC9Kg"
self.consumer_token = "pZk470MGJtTBoE8obbAFdJfgviqFYTZDXhKMNtFlbQ"
self.request_token_url = 'http://twitter.com/oauth/request_token'
self.access_token_url = 'https://twitter.com/oauth/access_token'
self.authorization_url = 'http://twitter.com/oauth/authorize'
self.callback_url = None
self.oauth_token = None
self.oauth_token_secret = None
def set_request_type(self, mime):
self.request_type = mime
# set_request_type for every instantiated resources:
for resource in self.resources:
self.resources[resource].set_request_type(mime)
def __getattr__(self, name):
#logging.info("API.getattr.name: %s" % name)
key = name
if not key in self.resources:
#logging.info("Creating resource with uri: %s" % key)
self.resources[key] = Resource(uri=key,api=self,)
return self.resources[key]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment