Skip to content

Instantly share code, notes, and snippets.

@KazuyaHayashi
Created November 26, 2012 01:51
Show Gist options
  • Save KazuyaHayashi/4146181 to your computer and use it in GitHub Desktop.
Save KazuyaHayashi/4146181 to your computer and use it in GitHub Desktop.
OAuth2 Web Server sample
import base64
import logging
import simplejson as json
import os
import datetime
import functools
import gdata.apps.service
import gdata.apps.client
import gdata.auth
import gdata.gauth
import gdata.alt.appengine
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
from gdata.apps.service import AppsForYourDomainException
from utils import overloaded_post, get_json_from_feed
INIT = {
'APP_NAME': 'googlecode-provisioningtester-v1',
'SCOPES':
('https://apps-apis.google.com/a/feeds/groups/ '
'https://apps-apis.google.com/a/feeds/user/ '
'https://apps-apis.google.com/a/feeds/emailsettings/2.0/')
}
def getClientsSecrets():
f = open("client_secrets.json")
client_json = f.readlines()
return json.loads(client_json[0])
class OAuth2Token(db.Model):
domain = db.StringProperty()
access_token = db.TextProperty()
refresh_token = db.TextProperty()
token_expiry = db.DateTimeProperty()
def oauth2(f):
@functools.wraps(f)
def wrapper(*args, **kwds):
self = args[0]
domain = args[1]
original_url = self.request.url
oauth2_url = "/oauth2/%s/login?original_url=%s" % (domain, original_url)
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1)
if len(oauth2tokens) < 1:
return self.redirect(oauth2_url)
# refresh access token
if datetime.datetime.utcnow() > oauth2tokens[0].token_expiry:
logging.info(datetime.datetime.utcnow())
logging.debug(oauth2tokens[0].token_expiry)
return self.redirect(oauth2_url)
return f(*args, **kwds)
return wrapper
class OAuth2Handler(webapp.RequestHandler):
def get(self, domain):
original_url = self.request.get("original_url", None)
if original_url is None:
original_url = "/oauth2/%s/userlist" % domain
client_secrets = getClientsSecrets()
token = gdata.gauth.OAuth2Token(
client_id=client_secrets['web']['client_id'],
client_secret=client_secrets['web']['client_secret'],
scope=INIT['SCOPES'],
user_agent='oauth2-provisioningv2')
redirect_url = token.generate_authorize_url(
redirect_uri=client_secrets['web']['redirect_uris'][0])
state = {"domain":domain, "original_url":original_url}
state_json = json.dumps(state)
redirect_url += "&state=" + base64.b64encode(state_json)
return self.redirect(redirect_url)
class OAuth2RedirectHandler(webapp.RequestHandler):
def get(self):
state = self.request.get("state")
state_json = base64.b64decode(state)
state_dict = json.loads(state_json)
domain = state_dict["domain"]
original_url = state_dict["original_url"]
# create token object
client_secrets = getClientsSecrets()
code = self.request.get("code")
token = gdata.gauth.OAuth2Token(
client_id=client_secrets['web']['client_id'],
client_secret=client_secrets['web']['client_secret'],
scope=INIT['SCOPES'],
user_agent='oauth2-provisioningv2')
# setting self.redirect_url
token.generate_authorize_url(redirect_uri=client_secrets['web']['redirect_uris'][0])
# set token info
token.get_access_token(code)
# store access token
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1)
oauth2token = OAuth2Token() if len(oauth2tokens) < 1 else oauth2tokens[0]
oauth2token.domain = domain
oauth2token.access_token = token.access_token
oauth2token.refresh_token = token.refresh_token
oauth2token.token_expiry = token.token_expiry
oauth2token.put()
return self.redirect(original_url)
class OAuth2MainPageHandler(webapp.RequestHandler):
@oauth2
def get(self, domain):
# set access token
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1)
if len(oauth2tokens) < 1:
return self.redirect("/oauth2/")
oauth2token = oauth2tokens[0]
oauth2_auth_header = "OAuth %s" % oauth2token.access_token
service = gdata.apps.service.AppsService(
source=INIT['APP_NAME'], domain=domain,
additional_headers={"Authorization":oauth2_auth_header})
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True)
# request API
try:
user_feed = service.RetrieveAllUsers()
except AppsForYourDomainException:
return self.redirect("/oauth2/%s/login" % domain)
json = get_json_from_feed(user_feed)
# set template values
start_over_text = 'Revoke token'
app_title = 'Provisioning API Sample (OAuth2 Web server)'
template_values = {
'oauth_token': oauth2token.access_token,
'json': json,
'two_legged_oauth': False,
'start_over_text': start_over_text,
'app_title': app_title
}
path = os.path.join(os.path.dirname(__file__), 'index.html')
self.response.out.write(template.render(path, template_values))
return self.response.out.write(user_feed)
@oauth2
@overloaded_post
def post(self, domain):
# set access token
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1)
if len(oauth2tokens) < 1:
return self.redirect("/oauth2/")
oauth2token = oauth2tokens[0]
oauth2_auth_header = "OAuth %s" % oauth2token.access_token
service = gdata.apps.service.AppsService(
source=INIT['APP_NAME'], domain=domain,
additional_headers={"Authorization":oauth2_auth_header})
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True)
# request API
try:
service.CreateUser(
user_name=self.request.get("user_name"),
family_name=self.request.get("family_name"),
given_name=self.request.get("given_name"),
password=self.request.get("password"))
redirect_url = self.request.url
except AppsForYourDomainException:
redirect_url ="/oauth2/%s/login" % domain
return self.redirect(redirect_url)
@oauth2
def delete(self, domain):
# set access token
oauth2tokens = OAuth2Token.all().filter("domain =", domain).fetch(1)
if len(oauth2tokens) < 1:
return self.redirect("/oauth2/")
oauth2token = oauth2tokens[0]
oauth2_auth_header = "OAuth %s" % oauth2token.access_token
service = gdata.apps.service.AppsService(
source=INIT['APP_NAME'], domain=domain,
additional_headers={"Authorization":oauth2_auth_header})
gdata.alt.appengine.run_on_appengine(service, store_tokens=False, single_user_mode=True)
# request API
delete_users = self.request.get_all("delete_users")
try:
for delete_user in delete_users:
service.DeleteUser(user_name=delete_user)
redirect_url = self.request.url
except AppsForYourDomainException:
redirect_url ="/oauth2/%s/login" % domain
return self.redirect(redirect_url)
class OAuth2DomainRegisterHandler(webapp.RequestHandler):
def get(self):
domains = OAuth2Token().all().fetch(1000)
template_vars = {
"domains": domains
}
return self.response.out.write(
template.render("./register.html", template_vars)
)
def post(self):
domain = OAuth2Token()
domain.domain = self.request.get("domain")
domain.put()
return self.redirect(self.request.url)
urls = [
('/oauth2/$', OAuth2DomainRegisterHandler),
('/oauth2/([^/]+)/login', OAuth2Handler),
('/oauth2/([^/]+)/userlist', OAuth2MainPageHandler),
('/oauth2callback', OAuth2RedirectHandler)
]
def main():
application = webapp.WSGIApplication(urls, debug = True)
run_wsgi_app(application)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment