Skip to content

Instantly share code, notes, and snippets.

@mgan59
Created October 8, 2010 18:08
Show Gist options
  • Save mgan59/617240 to your computer and use it in GitHub Desktop.
Save mgan59/617240 to your computer and use it in GitHub Desktop.
a code dump for an implemented lxml option for python-tumbl's authenticated reads
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2008 Ryan Cox ( ryan.a.cox@gmail.com ) All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''A wrapper library for Tumblr's public web API: http://www.tumblr.com/api'''
__author__ = 'ryan.a.cox@gmail.com'
__version__ = '0.1'
from httplib import HTTPConnection
from urllib2 import Request, urlopen, URLError, HTTPError
from urllib import urlencode, quote
from poster.encode import multipart_encode
from poster.streaminghttp import register_openers
import base64
import re
try:
import simplejson
except ImportError:
from django.utils import simplejson
try:
from lxml import objectify
except ImportError:
raise ImportError('Authenticated_Read requires the lxml library')
GENERATOR = 'python-tumblr'
PAGESIZE = 50
class TumblrError(Exception):
''' General Tumblr error '''
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class TumblrAuthError(TumblrError):
''' Wraps a 403 result '''
pass
class TumblrRequestError(TumblrError):
''' Wraps a 400 result '''
pass
class TumblrIterator(object):
def __init__(self, name, start, max, type):
self.name = name
self.start = start
self.max = max
self.type = type
self.results = None
self.index = 0
def __iter__(self):
return self
def next(self):
'''
Iterator explained:
On initial run self.results will be empty and thus a service call is made to tumblr. This payload returned from the
tumblr api will be parsed and stored inside of self.results. The len(self.results) is basically our total number of iterations
to run for the specified elements returned for the given start/num sent to the intial api request.
After the results are stored the code will increment the self.index and return an element
Subsequent iterations will rotate through skipping another service call unless the index
has caught up to the number of total posts, if so another request is made this time using
the current index as the 'start' for the service call. Think of this as fetching the next
page of results from tumblr. If there are no result left then self.results is going to
be empty, and the StopIteration is going to be thrown when evaluated.
'''
if not self.results or (self.index == len(self.results['posts'])):
self.start += self.index
self.index = 0
url = "http://%s.tumblr.com/api/read/json?start=%s&num=%s" % (self.name,self.start, PAGESIZE)
if self.type:
url += "&type=" + self.type
response = urlopen(url)
page = response.read()
m = re.match("^.*?({.*}).*$", page,re.DOTALL | re.MULTILINE | re.UNICODE)
self.results = simplejson.loads(m.group(1))
if (self.index >= self.max) or len(self.results['posts']) == 0:
raise StopIteration
self.index += 1
return self.results['posts'][self.index-1]
class TumblrIteratorAuthenticated(TumblrIterator):
def __init__(self,name, email, password, start,max,type, tags):
self.email = email
self.password = password
self.tags = tags
super(TumblrIteratorAuthenticated, self).__init__(name,start,max,type)
self.results = []
def next(self):
'''
See above for initial explanation.
Note:
This authenticated fetches the json data stream from tumblr. Authenticated mode means
private items are returned, problem with json is it doesn't indicate which entries are
private. The xml version of the data stream contains an attribute on the post node
'''
if not self.results or (self.index == len(self.results)):
self.start += self.index
self.index = 0
self.results = []
url = "http://%s.tumblr.com/api/read/?start=%s&num=%s" % (self.name,self.start, PAGESIZE)
param_set = {'password':self.password, 'email':self.email}
if self.type:
url += "&type=" + self.type
if self.tags:
url += "&tags=" + self.tags
## need to encode params for url open to do POST for authenticated read
params = urlencode(param_set)
response = urlopen(url, params)
xml_string = response.read()
xml_doc = objectify.fromstring(xml_string)
if hasattr(xml_doc.posts, 'post'):
for post_node in xml_doc.posts.post:
display_status = 'public'
## if private is True and not None then set display_status
if post_node.get('private'):
display_status = 'private'
tags = []
## check if a tags exists
if post_node.find('tag'):
for tag_item in post_node['tag']:
tags.append(unicode(tag_item))
post_obj = {
'id':post_node.get('id'),
'url':post_node.get('url'),
'url-with-slug':post_node.get('url-with-slug'),
'type':post_node.get('type'),
'format':post_node.get('format'),
'display_status':display_status,
'slug':post_node.get('slug'),
'tags':tags,
'date':post_node.get('date'),
'date-gmt':post_node.get('date-gmt'),
'unix-timestamp':post_node.get('unix-timestamp')
}
if post_node.get('type') == 'regular':
post_obj['regular-title'] = unicode(post_node['regular-title'])
post_obj['regular-body'] = unicode(post_node['regular-body'])
#if post_node.get('type') == 'photo':
## not implemented because getting the photo-url is going to require lxml magic
#post_obj['photo-caption'] = unicode(post_node['caption'])
self.results.append(post_obj)
else:
self.results = []
if (self.index >= self.max) or len(self.results) == 0:
raise StopIteration
self.index += 1
return self.results[self.index-1]
class Api(object):
def __init__(self, name, email=None, password=None, private=None, date=None, tags=None, format=None):
self.name = name
self.is_authenticated = False
self.email = email
self.password = password
self.private = private
self.date = date
self.tags = tags
self.format = format
def auth_check(self):
if self.is_authenticated:
return
url = 'http://www.tumblr.com/api/write'
values = {
'action': 'authenticate',
'generator' : GENERATOR,
'email': self.email,
'password' : self.password,
'private' : self.private,
'group': self.name,
'date': self.date,
'tags': self.tags,
'format': self.format
}
data = urlencode(values)
req = Request(url, data)
try:
response = urlopen(req)
page = response.read()
self.url = page
self.is_authenticated = True
return
except HTTPError, e:
if 403 == e.code:
raise TumblrAuthError(str(e))
if 400 == e.code:
raise TumblrRequestError(str(e))
except Exception, e:
raise TumblrError(str(e))
def dashboard(self):
self.domain = 'http://www.tumblr.com'
self.url = self.domain + '/login'
self.params = urlencode({'email':self.email, 'password': self.password})
self.headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}
self.response = self._getcookie(self.domain, self.url, self.headers, self.params)
self.cookie = self._cookie(self.response)
self.response = self._getcookie(self.domain, self.url, self.headers, self.params, self.cookie)
self.url_iphone = 'http://www.tumblr.com/iphone'
self.data = self._getcookie(self.domain, self.url_iphone, self.headers, self.params, self.cookie)
print self.data.read()
def _cookie(self, response):
self.cookie = response.getheader('set-cookie')
self.pfu = self.cookie[self.cookie.find('pfu'):self.cookie.find(' ')]
self.pfp = self.cookie[self.cookie.find('pfp'):]
self.pfp = self.pfp[:self.pfp.find(' ')]
self.pfe = self.cookie[self.cookie.find('pfe'):]
self.pfe = self.pfe[:self.pfe.find(' ')]
self.cookie = self.pfu + self.pfp + self.pfe
return self.cookie
def _getcookie(self, domain, url, headers, params = None, cookie = None):
self.session = HTTPConnection(domain, '80')
if cookie:
headers['Cookie'] = cookie
#headers['Referer'] = 'http://www.tumblr.com/iphone'
self.session.request('POST',url, params, headers)
self.response = self.session.getresponse()
#print self.response.status, self.response.reason
return self.response
def write_regular(self, title=None, body=None, **args):
if title:
args['title'] = title
if body:
args['body'] = body
args = self._fixnames(args)
if not 'title' in args and not 'body' in args:
raise TumblrError("Must supply either body or title argument")
self.auth_check()
args['type'] = 'regular'
return self._write(args)
def write_photo(self, source=None, data=None, caption=None, click=None, **args):
if source:
args['source'] = source
else:
args['data'] = open(data)
args['caption'] = caption
args['click-through-url'] = click
args = self._fixnames(args)
if 'source' in args and 'data' in args:
raise TumblrError("Must NOT supply both source and data arguments")
if not 'source' in args and not 'data' in args:
raise TumblrError("Must supply source or data argument")
self.auth_check()
args['type'] = 'photo'
return self._write(args)
def write_quote(self, quote=None, source=None, **args):
if quote:
args['quote'] = quote
args['source'] = source
args = self._fixnames(args)
if not 'quote' in args:
raise TumblrError("Must supply quote arguments")
self.auth_check()
args['type'] = 'quote'
return self._write(args)
def write_link(self, name=None, url=None, description=None, **args):
if url:
args['name'] = name
args['url'] = url
args['description'] = description
args = self._fixnames(args)
if not 'url' in args:
raise TumblrError("Must supply url argument")
self.auth_check()
args['type'] = 'link'
return self._write(args)
def write_conversation(self, title=None, conversation=None, **args):
if conversation:
args['title'] = title
args['conversation'] = conversation
args = self._fixnames(args)
if not 'conversation' in args:
raise TumblrError("Must supply conversation argument")
self.auth_check()
args['type'] = 'conversation'
return self._write(args)
def write_audio(self, data=None, source=None, caption=None, **args):
if data:
args['data'] = open(data)
else:
args['data'] = urlopen(source).read()
args['caption'] = caption
args = self._fixnames(args)
if not 'data' in args:
raise TumblrError("Must supply data argument")
self.auth_check()
args['type'] = 'audio'
return self._write(args)
def write_video(self, embed=None, caption=None, **args):
if embed:
args['embed'] = embed
args['caption'] = caption
args = self._fixnames(args)
if 'embed' in args and 'data' in args:
raise TumblrError("Must NOT supply both embed and data arguments")
if not 'embed' in args and not 'data' in args:
raise TumblrError("Must supply embed or data argument")
self.auth_check()
args['type'] = 'video'
return self._write(args)
def _fixnames(self, args):
for key in args:
if '_' in key:
value = args[key]
del args[key]
args[key.replace('_', '-')] = value
return args
def _write(self, params, headers=None):
self.auth_check()
url = 'http://www.tumblr.com/api/write'
register_openers()
params['email'] = self.email
params['password'] = self.password
params['private'] = self.private
params['generator'] = GENERATOR
params['group'] = self.name
params['date'] = self.date
params['tags'] = self.tags
params['format'] = self.format
if not params['date']:
params['date'] = 'now'
if not params['tags']:
del params['tags']
if not params['format']:
del params['format']
if not 'data' in params:
data = urlencode(params)
else:
data, headers = multipart_encode(params)
if headers:
req = Request(url, data, headers)
else:
req = Request(url, data)
newid = None
#print params
try:
f = urlopen(req)
raise TumblrError("Error writing post")
except HTTPError, e:
if 201 == e.code:
newid = e.read()
return self.read(id=newid)
raise TumblrError(e.read())
def authenticated_read(self, id=None, start=0, max=2**31-1, type=None, tags=None):
'''
a close of the read method only it uses post instead and includes email/password
to authenticate the read. note it returns a subclasses tumblr-iterator
'''
if id:
url = "http://%s.tumblr.com/api/read/json" % (self.name)
## need to encode params for urlopen to do POST for authenticated read
params = urlencode({'email':self.email, 'password':self.password, 'start':start, 'id':id})
response = urlopen(url=url, data=params)
page = response.read()
m = re.match("^.*?({.*}).*$", page,re.DOTALL | re.MULTILINE | re.UNICODE)
results = simplejson.loads(m.group(1))
if len(results['posts']) == 0:
return None
return results['posts'][0]
else:
return TumblrIteratorAuthenticated(self.name,self.email,self.password,start,max,type, tags)
def read(self, id=None, start=0,max=2**31-1,type=None):
if id:
url = "http://%s.tumblr.com/api/read/json?id=%s" % (self.name,id)
response = urlopen(url)
page = response.read()
m = re.match("^.*?({.*}).*$", page,re.DOTALL | re.MULTILINE | re.UNICODE)
results = simplejson.loads(m.group(1))
if len(results['posts']) == 0:
return None
return results['posts'][0]
else:
return TumblrIterator(self.name,start,max,type)
if __name__ == "__main__":
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment