Instantly share code, notes, and snippets.

Embed
What would you like to do?
import httplib
import time
import oauth
import urllib
from google.appengine.api import memcache
from django.utils import simplejson as json
from google.appengine.api import users
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
# fake urls for the test server (matches ones in server.py)
REQUEST_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetRequestToken'
ACCESS_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetAccessToken'
AUTHORIZATION_URL = 'https://www.google.com/accounts/OAuthAuthorizeToken'
# key and secret granted by the service provider for this consumer application - same as the MockOAuthDataStore
CONSUMER_KEY = 'anonymous'
CONSUMER_SECRET = 'anonymous'
# example client using httplib with headers
class SimpleOAuthClient(oauth.OAuthClient):
def __init__(self, server, port=httplib.HTTP_PORT, request_token_url='', access_token_url='', authorization_url=''):
self.server = server
self.port = port
self.request_token_url = request_token_url
self.access_token_url = access_token_url
self.authorization_url = authorization_url
self.connection = httplib.HTTPSConnection('www.google.com')
def fetch_request_token(self, oauth_request):
# via headers
# -> OAuthToken
self.connection.request(oauth_request.http_method, self.request_token_url+'?'+urllib.urlencode(oauth_request.parameters))
response = self.connection.getresponse()
return oauth.OAuthToken.from_string(response.read())
def fetch_access_token(self, oauth_request):
# via headers
# -> OAuthToken
self.connection.request(oauth_request.http_method, self.access_token_url, headers=oauth_request.to_header())
response = self.connection.getresponse()
return oauth.OAuthToken.from_string(response.read())
def authorize_token(self, oauth_request):
# via url
# -> typically just some okay response
self.connection.request(oauth_request.http_method, oauth_request.to_url())
response = self.connection.getresponse()
return response.getheader('location')#.read()
class OAuthPartOne(webapp.RequestHandler):
def get(self):
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL)
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET)
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1()
import random
cid = str(int(random.uniform(0,999999999999)))
callback = self.request.url[0:-len(self.request.path+self.request.query_string)]+'/get_oauth_token2?id='+cid
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, callback=callback, http_url=client.request_token_url, parameters = {'scope':'http://wave.googleusercontent.com/api/rpc'})
oauth_request.sign_request(signature_method_hmac_sha1, consumer, None)
token = client.fetch_request_token(oauth_request)
memcache.set("PK_"+cid, token.to_string())
oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=client.authorization_url)
response = client.authorize_token(oauth_request)
self.redirect(response)
class OAuthPartTwo(webapp.RequestHandler):
def get(self):
verifier = self.request.get('oauth_verifier')
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1()
tokenstr = memcache.get("PK_"+self.request.get('id'))
token = oauth.OAuthToken.from_string(tokenstr)
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET)
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL)
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token=token, verifier=verifier, http_url=client.access_token_url)
oauth_request.sign_request(signature_method_hmac_sha1, consumer, token)
token = client.fetch_access_token(oauth_request)
self.redirect('/static/ui.html?'+token.to_string())
class RPC(webapp.RequestHandler):
def post(self):
tokenstr = self.request.query_string
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1()
token = oauth.OAuthToken.from_string(tokenstr)
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET)
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL)
RESOURCE = 'https://www-opensocial.googleusercontent.com/api/rpc'
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token=token, http_method='POST', http_url=RESOURCE)
oauth_request.sign_request(signature_method_hmac_sha1, consumer, token)
connection = httplib.HTTPSConnection('www-opensocial.googleusercontent.com')
connection.request("POST", oauth_request.to_url(), self.request.body, {'Content-type':'application/json'})
self.response.out.write(connection.getresponse().read())
def main():
application = webapp.WSGIApplication([('/', OAuthPartOne),
('/get_oauth_token2', OAuthPartTwo),
('/rpc', RPC)],
debug=True)
run_wsgi_app(application)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment