Skip to content

Instantly share code, notes, and snippets.

@vincenting
Created March 10, 2015 13:50
Show Gist options
  • Save vincenting/aaa06ee9b4a384205b74 to your computer and use it in GitHub Desktop.
Save vincenting/aaa06ee9b4a384205b74 to your computer and use it in GitHub Desktop.
github oauth2 for tornado.
#!/usr/bin/python
# -*- coding: utf-8 -*-
import functools
import urllib.parse as urllib_parse
from tornado.concurrent import return_future
from tornado.auth import _auth_return_future, AuthError
from tornado import httpclient
from tornado.httputil import url_concat
from tornado import escape
class GithubOAuth2Mixin(object):
_OAUTH_AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
_OAUTH_ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token"
_OAUTH_USER_BASE_URL = "https://api.github.com"
_OAUTH_SETTINGS_KEY = "github_oauth"
@return_future
def authorize_redirect(self, scopes=None, response_type="code", callback=None, **kwargs):
args = {
"client_id": self.settings[self._OAUTH_SETTINGS_KEY]["key"],
"response_type": response_type
}
if kwargs:
args.update(kwargs)
if scopes:
args["scope"] = ",".join(scopes)
self.redirect(
url_concat(self._OAUTH_AUTHORIZE_URL, args))
callback()
@_auth_return_future
def get_authenticated_user(self, code, callback):
"""Handles the login for the Github user, returning a user object.
Example usage::
class GithubOAuth2LoginHandler(tornado.web.RequestHandler,
GithubOAuth2Mixin):
@tornado.gen.coroutine
def get(self):
if self.get_argument("code", False):
user = yield self.get_authenticated_user(code=self.get_argument("code"))
# Save the user with e.g. set_secure_cookie
else:
yield self.authorize_redirect(scope=["user:email"])
"""
http = self.get_auth_http_client()
body = urllib_parse.urlencode({
"code": code,
"client_id": self.settings[self._OAUTH_SETTINGS_KEY]["key"],
"client_secret": self.settings[self._OAUTH_SETTINGS_KEY]["secret"],
})
http.fetch(self._OAUTH_ACCESS_TOKEN_URL,
functools.partial(self._on_access_token, callback),
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
body=body)
def _on_access_token(self, future, response):
"""Callback function for the exchange to the access token."""
if response.error:
future.set_exception(AuthError("Github auth error: %s" % str(response)))
return
args = escape.json_decode(escape.native_str(response.body))
access_token = args.get("access_token", None)
if not access_token:
future.set_result(None)
scopes = args["scope"].split(",")
has_user_email_scope = scopes.count("user:email") > 0
self.github_request(
path="/user",
callback=functools.partial(
self._on_get_user_info, future, access_token, has_user_email_scope),
access_token=access_token
)
def _on_get_user_info(self, future, access_token, has_user_email_scope, user):
if user is None:
future.set_result(None)
return
user.update({"access_token": access_token})
if not has_user_email_scope:
return future.set_result(user)
self.github_request(
path="/user/emails",
callback=functools.partial(
self._on_get_user_email, future, user),
access_token=access_token
)
@staticmethod
def _on_get_user_email(future, user, emails):
user.update({"private_emails": emails})
future.set_result(user)
@_auth_return_future
def github_request(self, path, callback, access_token=None, post_args=None, **args):
url = self._OAUTH_USER_BASE_URL + path
all_args = {}
if access_token:
all_args["access_token"] = access_token
all_args.update(args)
if all_args:
url += "?" + urllib_parse.urlencode(all_args)
callback = functools.partial(self._on_github_request, callback)
http = self.get_auth_http_client()
ua = "tornado"
if post_args is not None:
http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args),
callback=callback, user_agent=ua,
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"})
else:
http.fetch(url, method="GET", callback=callback, user_agent=ua, headers={"Accept": "application/json"})
@staticmethod
def _on_github_request(future, response):
if response.error:
future.set_exception(AuthError("Error response %s fetching %s" %
(response.error, response.request.url)))
return
future.set_result(escape.json_decode(response.body))
@staticmethod
def get_auth_http_client():
return httpclient.AsyncHTTPClient()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment