Created
December 7, 2010 06:58
-
-
Save seanmonstar/731544 to your computer and use it in GitHub Desktop.
Makes a backup of your tumblr blog.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import urllib2 | |
from xml.dom import minidom | |
from datetime import datetime | |
def url_request(url): | |
text = None | |
try: | |
page = urllib2.urlopen(url) | |
except (urllib2.URLError, urllib2.HTTPError), ex: | |
print ('%s: %s' % (ex, url)) | |
else: | |
text = page.read() | |
page.close() | |
return text | |
def getNodeText(node): | |
rc = [] | |
for child in node.childNodes: | |
if child.nodeType == child.TEXT_NODE: | |
rc.append(child.nodeValue) | |
elif child.nodeType == child.ELEMENT_NODE: | |
rc.append(getNodeText(child)) | |
return ''.join(rc) | |
FILE_TYPES = { | |
'image/png': 'png', | |
'image/jpg': 'jpg', | |
'image/jpeg': 'jpg', | |
'image/gif': 'gif', | |
} | |
class Tumblr(object): | |
dom = None | |
firstFetch = True | |
oldTotal = 0 | |
totalPosts = 0 | |
postsToFetch = 50 | |
latestId = 0 | |
def __init__(self, site, email = None, password = None): | |
self.site = site | |
self.email = email | |
self.password = password | |
def read(self, start = 0, num = 50): | |
url = 'http://%s.tumblr.com/api/read?start=%s&num=%s' % (self.site, start, num) | |
postsRetrieved = 0 | |
raw_xml = url_request(url) | |
if not raw_xml: | |
return postsRetrieved | |
document = minidom.parseString(raw_xml) | |
postsEl = None | |
# If we don't have a backup before, we won't previously have has a | |
# an XML dom. | |
if not self.dom: | |
self.dom = document | |
postsRetrieved = len(self.dom.getElementsByTagName('post')) | |
postsEl = self.dom.getElementsByTagName('posts')[0] | |
else: | |
postNodes = document.getElementsByTagName('post') | |
postsImported = 0 | |
for node in postNodes: | |
if int(node.getAttribute('id')) <= self.latestId: | |
continue | |
self.dom.importNode(node, True) | |
self.dom.getElementsByTagName('posts')[0].appendChild(node) | |
postsImported += 1 | |
postsEl = self.dom.getElementsByTagName('posts')[0] | |
postsRetrieved = postsImported | |
if not self.totalPosts and postsEl: | |
self.totalPosts = int(postsEl.getAttribute('total')) | |
self.postsToFetch = self.totalPosts - self.oldTotal | |
return postsRetrieved | |
def images(self): | |
#if not self.postsToFetch: | |
# return | |
self.mkdir('images') | |
counter = 0 | |
# there are <img src="media.tumblr.com/asdasdfadfa" | |
# and image posts: post[type=photo]/photo-url[0] | |
posts = self.dom.getElementsByTagName('post') | |
for post in posts:#range(self.postsToFetch): | |
#post = posts[i] | |
if post.getAttribute('type') == 'photo': | |
# first one seems to always be the largest (original) | |
photoUrl = post.getElementsByTagName('photo-url')[0] | |
self.image(getNodeText(photoUrl).strip()) | |
def image(self, url): | |
# check if we've already downloaded | |
img = Image(self, url) | |
if not img.exists(): | |
print "%s doesn't exist" % img | |
img.download() | |
else: | |
print "%s EXISTS" % img | |
def output(self): | |
if self.dom: | |
return self.dom.toprettyxml(encoding='utf-8') | |
else: | |
return '(nothing)' | |
def backup(self, targetDir=None): | |
''' | |
Reads all posts, then outputs XML into file. | |
''' | |
self.load() | |
posts = 0 | |
while posts < self.postsToFetch: | |
print 'Fetching posts...' | |
newPosts = self.read(posts, min(50, self.postsToFetch - posts)) | |
posts += newPosts | |
print 'Received %s/%s new posts...' % (posts, self.postsToFetch) | |
self.mkdir() | |
# TODO: | |
# 1. save any images hosted on tumblr itself | |
# 2. handle old articles having been updated | |
if posts > 0: | |
filename = self.filename() | |
file = open(filename, 'w') | |
file.write(self.output()) | |
file.close() | |
print "Saved %d posts to %s" % (posts, filename) | |
self.images() | |
def load(self): | |
""" | |
Tries to load existing archive, so as to only download new articles. | |
""" | |
self.mkdir() | |
if os.path.isfile(self.filename()): | |
#TODO: copy to filename.1.xml (backup) | |
f = open(self.filename(), 'r') | |
document = minidom.parseString(f.read()) | |
f.close() | |
self.dom = document | |
self.oldTotal = int(self.dom.getElementsByTagName('posts')[0].getAttribute('total')) | |
self.latestId = int(self.dom.getElementsByTagName('post')[0].getAttribute('id')) | |
self.firstFetch = False | |
def mkdir(self, name=None): | |
target = self.dirname(name) | |
if not os.path.isdir(target): | |
os.mkdir(target) | |
def dirname(self, name=None): | |
master = os.path.join(os.path.dirname(__file__), self.site) | |
if name: | |
return os.path.join(master, name) | |
return master | |
def filename(self): | |
return os.path.join(self.dirname(), '%s.xml' % (self.site)) | |
class Image(object): | |
def __init__(self, tumblr, url): | |
self.tumblr = tumblr | |
self.url = url | |
self.filename = url.split('/')[-1] | |
def exists(self): | |
""" If theres no extension, then try all FILE_TYPES. """ | |
basename, ext = os.path.splitext(self.filename) | |
dirname = self.tumblr.dirname('images') | |
if not ext: | |
paths = [os.path.join(dirname, self.filename+'.'+ex) for ex in FILE_TYPES.values()] | |
else: | |
paths = [os.path.join(dirname, self.filename)] | |
return True in [os.path.isfile(p) for p in paths] | |
def download(self): | |
req = urllib2.urlopen(self.url) | |
headers = req.info() | |
ext = FILE_TYPES.get(headers.get('content-type')) | |
if not ext: | |
print "Unknown content type: %s for %s" % (headers.get('content-type'), self) | |
return | |
basename, extension = os.path.splitext(self.filename) | |
if extension: | |
filename = self.filename | |
else: | |
filename = '%s.%s' % (self.filename, ext) | |
f = open(os.path.join(self.tumblr.dirname('images'), filename), 'wb') | |
f.write(req.read()) | |
f.close() | |
req.close() | |
def __str__(self): | |
return '<Image: %s>' % self.filename | |
if __name__ == '__main__': | |
try: | |
subdomain = sys.argv[1] | |
except IndexError: | |
print "Usage: tumblr.py <tumblr account>" | |
else: | |
Tumblr(sys.argv[1]).backup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment