Skip to content

Instantly share code, notes, and snippets.

@keokilee
Created January 18, 2013 20:00
Show Gist options
  • Save keokilee/4567985 to your computer and use it in GitHub Desktop.
Save keokilee/4567985 to your computer and use it in GitHub Desktop.
A very limited helper class for working with the Tumblr API. Only includes a posts method and a reblog method.
import json
from urllib import urlencode
from urllib2 import urlopen
from urlparse import parse_qsl
import oauth2 as oauth
ACCESS_TOKEN_URL = 'http://www.tumblr.com/oauth/access_token'
class TumblrOAuth():
def __init__(self, *args, **kwargs):
"""Initialize the API helper with Tumblr Credentials."""
self.key = kwargs.get('consumer_key', None)
self.secret = kwargs.get('consumer_secret', None)
self.consumer = oauth.Consumer(self.key, self.secret)
if 'token' in kwargs and 'token_secret' in kwargs:
self.token = oauth.Token(kwargs['token'], kwargs['token_secret'])
self.username = kwargs.get('username', None) # Required for xAuth
self.password = kwargs.get('password', None) # Required for xAuth
def posts(self, hostname):
endpoint = 'http://api.tumblr.com/v2/blog/%s/posts?api_key=%s' % (hostname, self.key)
data = self._make_api_request(endpoint)
return data['response']['posts']
def reblog_post(self, post, dest_hostname):
endpoint = 'http://api.tumblr.com/v2/blog/%s/post/reblog' % (dest_hostname)
params = {
'id': post['id'],
'reblog_key': post['reblog_key'],
'type': post['type']
}
return self._make_oauth_request(endpoint, data=params)
def _make_api_request(self, url, method="GET"):
"""Use this if the request does not require OAuth."""
response = urlopen(url, method)
buf = ''
for line in response:
buf += line
response.close()
return json.loads(buf)
def _make_oauth_request(self, url, method="POST", data=None):
if not hasattr(self, 'token'):
self.token = self._get_xauth_token()
client = oauth.Client(self.consumer, self.token)
params = {
'method': method
}
if data is not None:
params['body'] = urlencode(data)
resp, data = client.request(url, **params)
return data
def _get_xauth_token(self):
consumer = oauth.Consumer(self.key, self.secret)
client = oauth.Client(consumer)
client.add_credentials(self.username, self.password)
client.authorizations
params = {
'x_auth_username': self.username,
'x_auth_password': self.password,
'x_auth_mode': 'client_auth'
}
client.set_signature_method = oauth.SignatureMethod_HMAC_SHA1()
resp, token = client.request(ACCESS_TOKEN_URL, method="POST", body=urlencode(params))
access_token = dict(parse_qsl(token))
return oauth.Token(access_token['oauth_token'], access_token['oauth_token_secret'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment