Created
November 26, 2012 01:51
-
-
Save KazuyaHayashi/4146181 to your computer and use it in GitHub Desktop.
OAuth2 Web Server sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import logging | |
import simplejson as json | |
import os | |
import datetime | |
import functools | |
import gdata.apps.service | |
import gdata.apps.client | |
import gdata.auth | |
import gdata.gauth | |
import gdata.alt.appengine | |
from google.appengine.ext import webapp | |
from google.appengine.ext.webapp import template | |
from google.appengine.ext.webapp.util import run_wsgi_app | |
from google.appengine.ext import db | |
from gdata.apps.service import AppsForYourDomainException | |
from utils import overloaded_post, get_json_from_feed | |
INIT = { | |
'APP_NAME': 'googlecode-provisioningtester-v1', | |
'SCOPES': | |
('https://apps-apis.google.com/a/feeds/groups/ ' | |
'https://apps-apis.google.com/a/feeds/user/ ' | |
'https://apps-apis.google.com/a/feeds/emailsettings/2.0/') | |
} | |
def getClientsSecrets(): | |
f = open("client_secrets.json") | |
client_json = f.readlines() | |
return json.loads(client_json[0]) | |
class OAuth2Token(db.Model): | |
domain = db.StringProperty() | |
access_token = db.TextProperty() | |
refresh_token = db.TextProperty() | |
token_expiry = db.DateTimeProperty() | |
def oauth2(f): | |
@functools.wraps(f) | |
def wrapper(*args, **kwds): | |
self = args[0] | |
domain = args[1] | |
original_url = self.request.url | |
oauth2_url = "/oauth2/%s/login?original_url=%s" % (domain, original_url) | |
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1) | |
if len(oauth2tokens) < 1: | |
return self.redirect(oauth2_url) | |
# refresh access token | |
if datetime.datetime.utcnow() > oauth2tokens[0].token_expiry: | |
logging.info(datetime.datetime.utcnow()) | |
logging.debug(oauth2tokens[0].token_expiry) | |
return self.redirect(oauth2_url) | |
return f(*args, **kwds) | |
return wrapper | |
class OAuth2Handler(webapp.RequestHandler): | |
def get(self, domain): | |
original_url = self.request.get("original_url", None) | |
if original_url is None: | |
original_url = "/oauth2/%s/userlist" % domain | |
client_secrets = getClientsSecrets() | |
token = gdata.gauth.OAuth2Token( | |
client_id=client_secrets['web']['client_id'], | |
client_secret=client_secrets['web']['client_secret'], | |
scope=INIT['SCOPES'], | |
user_agent='oauth2-provisioningv2') | |
redirect_url = token.generate_authorize_url( | |
redirect_uri=client_secrets['web']['redirect_uris'][0]) | |
state = {"domain":domain, "original_url":original_url} | |
state_json = json.dumps(state) | |
redirect_url += "&state=" + base64.b64encode(state_json) | |
return self.redirect(redirect_url) | |
class OAuth2RedirectHandler(webapp.RequestHandler): | |
def get(self): | |
state = self.request.get("state") | |
state_json = base64.b64decode(state) | |
state_dict = json.loads(state_json) | |
domain = state_dict["domain"] | |
original_url = state_dict["original_url"] | |
# create token object | |
client_secrets = getClientsSecrets() | |
code = self.request.get("code") | |
token = gdata.gauth.OAuth2Token( | |
client_id=client_secrets['web']['client_id'], | |
client_secret=client_secrets['web']['client_secret'], | |
scope=INIT['SCOPES'], | |
user_agent='oauth2-provisioningv2') | |
# setting self.redirect_url | |
token.generate_authorize_url(redirect_uri=client_secrets['web']['redirect_uris'][0]) | |
# set token info | |
token.get_access_token(code) | |
# store access token | |
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1) | |
oauth2token = OAuth2Token() if len(oauth2tokens) < 1 else oauth2tokens[0] | |
oauth2token.domain = domain | |
oauth2token.access_token = token.access_token | |
oauth2token.refresh_token = token.refresh_token | |
oauth2token.token_expiry = token.token_expiry | |
oauth2token.put() | |
return self.redirect(original_url) | |
class OAuth2MainPageHandler(webapp.RequestHandler): | |
@oauth2 | |
def get(self, domain): | |
# set access token | |
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1) | |
if len(oauth2tokens) < 1: | |
return self.redirect("/oauth2/") | |
oauth2token = oauth2tokens[0] | |
oauth2_auth_header = "OAuth %s" % oauth2token.access_token | |
service = gdata.apps.service.AppsService( | |
source=INIT['APP_NAME'], domain=domain, | |
additional_headers={"Authorization":oauth2_auth_header}) | |
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True) | |
# request API | |
try: | |
user_feed = service.RetrieveAllUsers() | |
except AppsForYourDomainException: | |
return self.redirect("/oauth2/%s/login" % domain) | |
json = get_json_from_feed(user_feed) | |
# set template values | |
start_over_text = 'Revoke token' | |
app_title = 'Provisioning API Sample (OAuth2 Web server)' | |
template_values = { | |
'oauth_token': oauth2token.access_token, | |
'json': json, | |
'two_legged_oauth': False, | |
'start_over_text': start_over_text, | |
'app_title': app_title | |
} | |
path = os.path.join(os.path.dirname(__file__), 'index.html') | |
self.response.out.write(template.render(path, template_values)) | |
return self.response.out.write(user_feed) | |
@oauth2 | |
@overloaded_post | |
def post(self, domain): | |
# set access token | |
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1) | |
if len(oauth2tokens) < 1: | |
return self.redirect("/oauth2/") | |
oauth2token = oauth2tokens[0] | |
oauth2_auth_header = "OAuth %s" % oauth2token.access_token | |
service = gdata.apps.service.AppsService( | |
source=INIT['APP_NAME'], domain=domain, | |
additional_headers={"Authorization":oauth2_auth_header}) | |
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True) | |
# request API | |
try: | |
service.CreateUser( | |
user_name=self.request.get("user_name"), | |
family_name=self.request.get("family_name"), | |
given_name=self.request.get("given_name"), | |
password=self.request.get("password")) | |
redirect_url = self.request.url | |
except AppsForYourDomainException: | |
redirect_url ="/oauth2/%s/login" % domain | |
return self.redirect(redirect_url) | |
@oauth2 | |
def delete(self, domain): | |
# set access token | |
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1) | |
if len(oauth2tokens) < 1: | |
return self.redirect("/oauth2/") | |
oauth2token = oauth2tokens[0] | |
oauth2_auth_header = "OAuth %s" % oauth2token.access_token | |
service = gdata.apps.service.AppsService( | |
source=INIT['APP_NAME'], domain=domain, | |
additional_headers={"Authorization":oauth2_auth_header}) | |
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True) | |
# request API | |
delete_users = self.request.get_all("delete_users") | |
try: | |
for delete_user in delete_users: | |
service.DeleteUser(user_name=delete_user) | |
redirect_url = self.request.url | |
except AppsForYourDomainException: | |
redirect_url ="/oauth2/%s/login" % domain | |
return self.redirect(redirect_url) | |
class OAuth2DomainRegisterHandler(webapp.RequestHandler): | |
def get(self): | |
domains = OAuth2Token().all().fetch(1000) | |
template_vars = { | |
"domains": domains | |
} | |
return self.response.out.write( | |
template.render("./register.html", template_vars) | |
) | |
def post(self): | |
domain = OAuth2Token() | |
domain.domain = self.request.get("domain") | |
domain.put() | |
return self.redirect(self.request.url) | |
urls = [ | |
('/oauth2/$', OAuth2DomainRegisterHandler), | |
('/oauth2/([^/]+)/login', OAuth2Handler), | |
('/oauth2/([^/]+)/userlist', OAuth2MainPageHandler), | |
('/oauth2callback', OAuth2RedirectHandler) | |
] | |
def main(): | |
application = webapp.WSGIApplication(urls, debug = True) | |
run_wsgi_app(application) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment