Skip to content

Instantly share code, notes, and snippets.

@ysimonson
Last active February 22, 2016 19:56
Show Gist options
  • Save ysimonson/5877284 to your computer and use it in GitHub Desktop.
Save ysimonson/5877284 to your computer and use it in GitHub Desktop.
LinkedIn OAuth2 authentication over tornado
from tornado import auth, httpclient, httputil, escape
import urllib.request
import urllib.parse
import urllib.error
import functools
REQUEST_TIMEOUT = 20.0
class LinkedInMixin(auth.OAuth2Mixin):
"""
LinkedIn authentication using OAuth2.
Example usage::
class LinkedInLoginHandler(LoginHandler, LinkedInMixin):
@tornado.gen.coroutine
def get(self):
code = self.get_argument("code", None)
redirect_uri = "%s://%s%s" % (self.request.protocol, self.request.host, self.request.path)
if not code:
# Generate a random state
state = binascii.b2a_hex(os.urandom(15))
self.set_secure_cookie("linkedin_state", state)
yield self.authorize_redirect(
redirect_uri=redirect_uri,
client_id=self.settings["linkedin_client_id"],
extra_params={
"response_type": "code",
"state": state,
"scope": "r_basicprofile r_emailaddress"
}
)
return
# Validate the state
if self.get_argument("state", None) != self.get_secure_cookie("linkedin_state"):
raise tornado.web.HTTPError(400, "Invalid state")
user_data = yield self.get_authenticated_user(
redirect_uri=redirect_uri,
client_id=self.settings["linkedin_client_id"],
client_secret=self.settings["linkedin_client_secret"],
code=code,
extra_fields=["formatted-name", "email-address"]
)
if not user_data:
raise tornado.web.HTTPError(400, "LinkedIn authentication failed")
# Handle authenticated user
"""
_OAUTH_ACCESS_TOKEN_URL = "https://www.linkedin.com/uas/oauth2/accessToken?"
_OAUTH_AUTHORIZE_URL = "https://www.linkedin.com/uas/oauth2/authorization?"
_OAUTH_NO_CALLBACKS = False
@auth._auth_return_future # pylint: disable=protected-access
def get_authenticated_user(self, redirect_uri, client_id, client_secret, code, callback, extra_fields=None):
http = httpclient.AsyncHTTPClient()
args = {
"redirect_uri": redirect_uri,
"code": code,
"client_id": client_id,
"client_secret": client_secret,
"extra_params": {
"grant_type": "authorization_code"
}
}
fields = set(['id'])
if extra_fields:
fields.update(extra_fields)
http.fetch(
self._oauth_request_token_url(**args),
functools.partial(self._on_access_token, redirect_uri, client_id, client_secret, callback, fields),
method="POST", body="", request_timeout=REQUEST_TIMEOUT
)
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): # pylint: disable=unused-argument
if response.error:
self._set_error(future, 'LinkedIn auth error (%s): %s' % (response.code, response.body), response)
return
args = escape.json_decode(response.body)
expires_in = args["expires_in"]
access_token = args["access_token"]
self.linkedin_request(
path="/v1/people/~:(%s)" % ",".join(fields),
callback=functools.partial(self._on_get_user_info, future, expires_in, access_token),
access_token=access_token,
)
def _on_get_user_info(self, future, expires_in, access_token, user):
if user is None:
future.set_result(None)
return
user["access_token"] = access_token
user["expires_in"] = expires_in
future.set_result(user)
def _set_error(self, future, message, response):
e = auth.AuthError(message)
e.code = response.code
e.url = response.request.url
e.body = response.body
future.set_exception(e)
@auth._auth_return_future # pylint: disable=protected-access
def linkedin_request(self, path, callback, method="GET", access_token=None, post_args=None, query_args=None):
url = "https://api.linkedin.com" + path
# Build the query parameters
all_query_args = dict(query_args or {})
if access_token:
all_query_args["oauth2_access_token"] = access_token
if all_query_args:
url += "?" + urllib.parse.urlencode(all_query_args)
# Build the request body. Empty bodies must be either set to an empty
# string or None based on the request method. This is because the
# Tornado HTTP client aggressively throws errors based on the request
# method / body content combination.
if self.request.method in ("POST", "PATCH", "PUT"):
if post_args:
body = urllib.parse.urlencode(post_args)
else:
body = ""
else:
body = None
http = httpclient.AsyncHTTPClient()
# Ask linkedin to send us JSON on all API calls (not xml)
headers = httputil.HTTPHeaders({"x-li-format": "json"})
http.fetch(
url,
callback=functools.partial(self._on_linkedin_request, callback),
method=method, headers=headers, body=body, request_timeout=REQUEST_TIMEOUT
)
def _on_linkedin_request(self, future, response):
if response.error:
self._set_error(future, "LinkedIn error (%s) when requesting %s: %s" % (response.code, response.request.url, response.body), response)
else:
future.set_result(escape.json_decode(response.body))
@david415
Copy link

looks like decent code... Is it tested?

@ysimonson
Copy link
Author

Didn't see this comment earlier. It was tested/used in production until recently. With tornado 4.x this will no longer work, as async_callback is removed.

@ysimonson
Copy link
Author

Updated the snippet to work on tornado 4.x. I'll continue to update it if other issues arise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment