Skip to content

Instantly share code, notes, and snippets.

@fanzeyi
Created November 17, 2011 04:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fanzeyi/1372391 to your computer and use it in GitHub Desktop.
Save fanzeyi/1372391 to your computer and use it in GitHub Desktop.
Fanfou OAuth for Tornado
# -*- coding: utf-8 -*-
# author: Zeray Rice <fanzeyi1994@gmail.com>
import urllib
import logging
import tornado.auth
from tornado import escape
from tornado import httpclient
class FanfouMixin(tornado.auth.OAuthMixin):
_OAUTH_REQUEST_TOKEN_URL = "http://fanfou.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL = "http://fanfou.com/oauth/access_token"
_OAUTH_AUTHENTICATE_URL = "http://fanfou.com/oauth/authenticate"
_OAUTH_NO_CALLBACKS = False
def authenticate_redirect(self):
http = httpclient.AsyncHTTPClient()
http.fetch(self._oauth_request_token_url(), self.async_callback(self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None))
def fanfou_request(self, path, callback, access_token=None,
post_args=None, **args):
url = "http://api.fanfou.com" + path + ".json"
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = "POST" if post_args is not None else "GET"
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
if args: url += "?" + urllib.urlencode(args)
callback = self.async_callback(self._on_fanfou_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
http.fetch(url, method="POST", body=urllib.urlencode(post_args),
callback=callback)
else:
http.fetch(url, callback=callback)
def _on_fanfou_request(self, callback, response):
if response.error:
logging.warning("Error response %s fetching %s", response.error,
response.request.url)
callback(None)
return
callback(escape.json_decode(response.body))
def _oauth_consumer_token(self):
self.require_setting("fanfou_consumer_key", "Fanfou OAuth")
self.require_setting("fanfou_consumer_secret", "Fanfou OAuth")
return dict(
key = self.settings["fanfou_consumer_key"],
secret = self.settings["fanfou_consumer_secret"])
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.fanfou_request(
"/users/show",
access_token=access_token, callback=callback)
def _parse_user_response(self, callback, user):
if user:
user["username"] = user["screen_name"]
callback(user)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment