Created
May 28, 2010 10:51
-
-
Save antimatter15/417035 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httplib | |
import time | |
import oauth | |
import urllib | |
from google.appengine.api import memcache | |
from django.utils import simplejson as json | |
from google.appengine.api import users | |
from google.appengine.ext import webapp | |
from google.appengine.ext.webapp import template | |
from google.appengine.ext.webapp.util import run_wsgi_app | |
# fake urls for the test server (matches ones in server.py) | |
REQUEST_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetRequestToken' | |
ACCESS_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetAccessToken' | |
AUTHORIZATION_URL = 'https://www.google.com/accounts/OAuthAuthorizeToken' | |
# key and secret granted by the service provider for this consumer application - same as the MockOAuthDataStore | |
CONSUMER_KEY = 'anonymous' | |
CONSUMER_SECRET = 'anonymous' | |
# example client using httplib with headers | |
class SimpleOAuthClient(oauth.OAuthClient): | |
def __init__(self, server, port=httplib.HTTP_PORT, request_token_url='', access_token_url='', authorization_url=''): | |
self.server = server | |
self.port = port | |
self.request_token_url = request_token_url | |
self.access_token_url = access_token_url | |
self.authorization_url = authorization_url | |
self.connection = httplib.HTTPSConnection('www.google.com') | |
def fetch_request_token(self, oauth_request): | |
# via headers | |
# -> OAuthToken | |
self.connection.request(oauth_request.http_method, self.request_token_url+'?'+urllib.urlencode(oauth_request.parameters)) | |
response = self.connection.getresponse() | |
return oauth.OAuthToken.from_string(response.read()) | |
def fetch_access_token(self, oauth_request): | |
# via headers | |
# -> OAuthToken | |
self.connection.request(oauth_request.http_method, self.access_token_url, headers=oauth_request.to_header()) | |
response = self.connection.getresponse() | |
return oauth.OAuthToken.from_string(response.read()) | |
def authorize_token(self, oauth_request): | |
# via url | |
# -> typically just some okay response | |
self.connection.request(oauth_request.http_method, oauth_request.to_url()) | |
response = self.connection.getresponse() | |
return response.getheader('location')#.read() | |
class OAuthPartOne(webapp.RequestHandler): | |
def get(self): | |
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL) | |
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET) | |
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1() | |
import random | |
cid = str(int(random.uniform(0,999999999999))) | |
callback = self.request.url[0:-len(self.request.path+self.request.query_string)]+'/get_oauth_token2?id='+cid | |
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, callback=callback, http_url=client.request_token_url, parameters = {'scope':'http://wave.googleusercontent.com/api/rpc'}) | |
oauth_request.sign_request(signature_method_hmac_sha1, consumer, None) | |
token = client.fetch_request_token(oauth_request) | |
memcache.set("PK_"+cid, token.to_string()) | |
oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=client.authorization_url) | |
response = client.authorize_token(oauth_request) | |
self.redirect(response) | |
class OAuthPartTwo(webapp.RequestHandler): | |
def get(self): | |
verifier = self.request.get('oauth_verifier') | |
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1() | |
tokenstr = memcache.get("PK_"+self.request.get('id')) | |
token = oauth.OAuthToken.from_string(tokenstr) | |
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET) | |
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL) | |
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token=token, verifier=verifier, http_url=client.access_token_url) | |
oauth_request.sign_request(signature_method_hmac_sha1, consumer, token) | |
token = client.fetch_access_token(oauth_request) | |
self.redirect('/static/ui.html?'+token.to_string()) | |
class RPC(webapp.RequestHandler): | |
def post(self): | |
tokenstr = self.request.query_string | |
signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1() | |
token = oauth.OAuthToken.from_string(tokenstr) | |
consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET) | |
client = SimpleOAuthClient("localhost", 80, REQUEST_TOKEN_URL, ACCESS_TOKEN_URL, AUTHORIZATION_URL) | |
RESOURCE = 'https://www-opensocial.googleusercontent.com/api/rpc' | |
oauth_request = oauth.OAuthRequest.from_consumer_and_token(consumer, token=token, http_method='POST', http_url=RESOURCE) | |
oauth_request.sign_request(signature_method_hmac_sha1, consumer, token) | |
connection = httplib.HTTPSConnection('www-opensocial.googleusercontent.com') | |
connection.request("POST", oauth_request.to_url(), self.request.body, {'Content-type':'application/json'}) | |
self.response.out.write(connection.getresponse().read()) | |
def main(): | |
application = webapp.WSGIApplication([('/', OAuthPartOne), | |
('/get_oauth_token2', OAuthPartTwo), | |
('/rpc', RPC)], | |
debug=True) | |
run_wsgi_app(application) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment