Created
January 18, 2013 20:00
-
-
Save keokilee/4567985 to your computer and use it in GitHub Desktop.
A very limited helper class for working with the Tumblr API. Only includes a posts method and a reblog method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from urllib import urlencode | |
from urllib2 import urlopen | |
from urlparse import parse_qsl | |
import oauth2 as oauth | |
ACCESS_TOKEN_URL = 'http://www.tumblr.com/oauth/access_token' | |
class TumblrOAuth(): | |
def __init__(self, *args, **kwargs): | |
"""Initialize the API helper with Tumblr Credentials.""" | |
self.key = kwargs.get('consumer_key', None) | |
self.secret = kwargs.get('consumer_secret', None) | |
self.consumer = oauth.Consumer(self.key, self.secret) | |
if 'token' in kwargs and 'token_secret' in kwargs: | |
self.token = oauth.Token(kwargs['token'], kwargs['token_secret']) | |
self.username = kwargs.get('username', None) # Required for xAuth | |
self.password = kwargs.get('password', None) # Required for xAuth | |
def posts(self, hostname): | |
endpoint = 'http://api.tumblr.com/v2/blog/%s/posts?api_key=%s' % (hostname, self.key) | |
data = self._make_api_request(endpoint) | |
return data['response']['posts'] | |
def reblog_post(self, post, dest_hostname): | |
endpoint = 'http://api.tumblr.com/v2/blog/%s/post/reblog' % (dest_hostname) | |
params = { | |
'id': post['id'], | |
'reblog_key': post['reblog_key'], | |
'type': post['type'] | |
} | |
return self._make_oauth_request(endpoint, data=params) | |
def _make_api_request(self, url, method="GET"): | |
"""Use this if the request does not require OAuth.""" | |
response = urlopen(url, method) | |
buf = '' | |
for line in response: | |
buf += line | |
response.close() | |
return json.loads(buf) | |
def _make_oauth_request(self, url, method="POST", data=None): | |
if not hasattr(self, 'token'): | |
self.token = self._get_xauth_token() | |
client = oauth.Client(self.consumer, self.token) | |
params = { | |
'method': method | |
} | |
if data is not None: | |
params['body'] = urlencode(data) | |
resp, data = client.request(url, **params) | |
return data | |
def _get_xauth_token(self): | |
consumer = oauth.Consumer(self.key, self.secret) | |
client = oauth.Client(consumer) | |
client.add_credentials(self.username, self.password) | |
client.authorizations | |
params = { | |
'x_auth_username': self.username, | |
'x_auth_password': self.password, | |
'x_auth_mode': 'client_auth' | |
} | |
client.set_signature_method = oauth.SignatureMethod_HMAC_SHA1() | |
resp, token = client.request(ACCESS_TOKEN_URL, method="POST", body=urlencode(params)) | |
access_token = dict(parse_qsl(token)) | |
return oauth.Token(access_token['oauth_token'], access_token['oauth_token_secret']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment