Created
August 10, 2010 02:27
-
-
Save grampelberg/516554 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.enterprise import adbapi | |
from twisted.web import server, resource | |
from twisted.internet import reactor | |
import oauth.oauth as oauth | |
REQUEST_TOKEN_URL = '/request_token' | |
ACCESS_TOKEN_URL = '/access_token' | |
AUTHORIZATION_URL = '/authorize' | |
CALLBACK_URL = '/request_token_ready' | |
RESOURCE_URL = '/photos' | |
REALM = 'http://localhost:8080/' | |
VERIFIER = 'verifier' | |
cp = adbapi.ConnectionPool('MySQLdb', db='foo', user='root') | |
class MockOAuthDataStore(oauth.OAuthDataStore): | |
def __init__(self, key, secret): | |
self.consumer = oauth.OAuthConsumer("asdf", "asdf") | |
self.request_token = oauth.OAuthToken(key, secret) | |
self.access_token = oauth.OAuthToken(key, secret) | |
self.nonce = 'nonce' | |
self.verifier = VERIFIER | |
def lookup_consumer(self, key): | |
if key == self.consumer.key: | |
return self.consumer | |
return None | |
def lookup_token(self, token_type, token): | |
token_attrib = getattr(self, '%s_token' % token_type) | |
if token == token_attrib.key: | |
## HACK | |
token_attrib.set_callback(CALLBACK_URL) | |
return token_attrib | |
return None | |
def lookup_nonce(self, oauth_consumer, oauth_token, nonce): | |
if oauth_token and oauth_consumer.key == self.consumer.key and \ | |
(oauth_token.key == self.request_token.key or \ | |
oauth_token.key == self.access_token.key) and \ | |
nonce == self.nonce: | |
return self.nonce | |
return None | |
def fetch_request_token(self, oauth_consumer, oauth_callback): | |
if oauth_consumer.key == self.consumer.key: | |
if oauth_callback: | |
# want to check here if callback is sensible | |
# for mock store, we assume it is | |
self.request_token.set_callback(oauth_callback) | |
return self.request_token | |
return None | |
def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): | |
if oauth_consumer.key == self.consumer.key and \ | |
oauth_token.key == self.request_token.key and \ | |
oauth_verifier == self.verifier: | |
# want to check here if token is authorized | |
# for mock store, we assume it is | |
return self.access_token | |
return None | |
def authorize_request_token(self, oauth_token, user): | |
if oauth_token.key == self.request_token.key: | |
# authorize the request token in the store | |
# for mock store, do nothing | |
return self.request_token | |
return None | |
class Simple(resource.Resource): | |
isLeaf = True | |
def create_server(self, result): | |
key = result[0][0] | |
secret = result[0][1] | |
self.oauth_server = oauth.OAuthServer(MockOAuthDataStore(key, secret)) | |
self.oauth_server.add_signature_method( | |
oauth.OAuthSignatureMethod_PLAINTEXT()) | |
self.oauth_server.add_signature_method( | |
oauth.OAuthSignatureMethod_HMAC_SHA1()) | |
def render_GET(self, request): | |
init = cp.runQuery('select k, secret from foo').addCallback( | |
self.create_server).addCallback(self.handle_request, request) | |
def handle_request(self, i, request): | |
oauth_request = oauth.OAuthRequest.from_request( | |
request.method, request.path, | |
headers=dict([(x.capitalize(), y) for x, y in | |
request.getAllHeaders().iteritems()])) | |
if request.path.startswith(REQUEST_TOKEN_URL): | |
token = self.oauth_server.fetch_request_token(oauth_request) | |
request.write(token) | |
if request.path.startswith(AUTHORIZATION_URL): | |
pass | |
if request.path.startswith(ACCESS_TOKEN_URL): | |
pass | |
if request.path.startswith(RESOURCE_URL): | |
pass | |
request.finish() | |
site = server.Site(Simple()) | |
reactor.listenTCP(8080, site) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment