Created
November 17, 2011 04:53
-
-
Save fanzeyi/1372391 to your computer and use it in GitHub Desktop.
Fanfou OAuth for Tornado
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# author: Zeray Rice <fanzeyi1994@gmail.com> | |
import urllib | |
import logging | |
import tornado.auth | |
from tornado import escape | |
from tornado import httpclient | |
class FanfouMixin(tornado.auth.OAuthMixin): | |
_OAUTH_REQUEST_TOKEN_URL = "http://fanfou.com/oauth/request_token" | |
_OAUTH_ACCESS_TOKEN_URL = "http://fanfou.com/oauth/access_token" | |
_OAUTH_AUTHENTICATE_URL = "http://fanfou.com/oauth/authenticate" | |
_OAUTH_NO_CALLBACKS = False | |
def authenticate_redirect(self): | |
http = httpclient.AsyncHTTPClient() | |
http.fetch(self._oauth_request_token_url(), self.async_callback(self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None)) | |
def fanfou_request(self, path, callback, access_token=None, | |
post_args=None, **args): | |
url = "http://api.fanfou.com" + path + ".json" | |
if access_token: | |
all_args = {} | |
all_args.update(args) | |
all_args.update(post_args or {}) | |
method = "POST" if post_args is not None else "GET" | |
oauth = self._oauth_request_parameters( | |
url, access_token, all_args, method=method) | |
args.update(oauth) | |
if args: url += "?" + urllib.urlencode(args) | |
callback = self.async_callback(self._on_fanfou_request, callback) | |
http = httpclient.AsyncHTTPClient() | |
if post_args is not None: | |
http.fetch(url, method="POST", body=urllib.urlencode(post_args), | |
callback=callback) | |
else: | |
http.fetch(url, callback=callback) | |
def _on_fanfou_request(self, callback, response): | |
if response.error: | |
logging.warning("Error response %s fetching %s", response.error, | |
response.request.url) | |
callback(None) | |
return | |
callback(escape.json_decode(response.body)) | |
def _oauth_consumer_token(self): | |
self.require_setting("fanfou_consumer_key", "Fanfou OAuth") | |
self.require_setting("fanfou_consumer_secret", "Fanfou OAuth") | |
return dict( | |
key = self.settings["fanfou_consumer_key"], | |
secret = self.settings["fanfou_consumer_secret"]) | |
def _oauth_get_user(self, access_token, callback): | |
callback = self.async_callback(self._parse_user_response, callback) | |
self.fanfou_request( | |
"/users/show", | |
access_token=access_token, callback=callback) | |
def _parse_user_response(self, callback, user): | |
if user: | |
user["username"] = user["screen_name"] | |
callback(user) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment