Skip to content

Instantly share code, notes, and snippets.

@Chaz6
Created December 3, 2018 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Chaz6/611fa1a9540c765f7bd2b25f7f05fc3b to your computer and use it in GitHub Desktop.
Save Chaz6/611fa1a9540c765f7bd2b25f7f05fc3b to your computer and use it in GitHub Desktop.
Threaded tumblr image/video downloader
#!/usr/bin/env python
import os, sys, re, time
from shutil import copyfileobj
from urllib.request import urlopen, unquote
from xml.etree import ElementTree as ET
from socket import error as socket_error
import socket
import requests
from six.moves import queue as Queue
from threading import Thread
TIMEOUT = 10
RETRY = 5
START = 0
NUM = 50
THREADS = 10
origGetAddrInfo = socket.getaddrinfo
def getAddrInfoWrapper(host, port, family=0, socktype=0, proto=0, flags=0):
return origGetAddrInfo(host, port, socket.AF_INET, socktype, proto, flags)
socket.getaddrinfo = getAddrInfoWrapper
def video_hd_match():
hd_pattern = re.compile(r'.*"hdUrl":("([^\s,]*)"|false),')
def match(video_player):
try:
hd_match = hd_pattern.search(video_player)
except(TypeError):
print("TypeError in video_hd_match(): %s" % video_player)
return None
try:
if hd_match is not None and hd_match.group(1) != 'false':
return hd_match.group(2).replace('\\', '')
except:
return None
return match
def video_default_match():
default_pattern = re.compile(r'.*src="(\S*)" ', re.DOTALL)
def match(video_player):
try:
default_match = default_pattern.search(video_player)
except(TypeError):
print("TypeError in video_default_match(): %s" % video_player)
return None
if default_match is not None:
try:
return default_match.group(1)
except:
return None
return match
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
download_url, file_path = self.queue.get()
if download_url is None:
break
self.download(download_url, file_path)
self.queue.task_done()
def download(self, download_url, file_path):
if not os.path.isfile(file_path):
print("Downloading %s from %s." % (file_path,
download_url))
retry_times = 0
while retry_times < RETRY:
try:
resp = requests.get(download_url,
stream=True,
proxies=None,
timeout=TIMEOUT)
if resp.status_code == 403:
retry_times = RETRY
#print("Access Denied when retrieve %s." % download_url)
raise Exception("Access Denied")
with open(file_path, 'wb') as fh:
for chunk in resp.iter_content(chunk_size=1024):
fh.write(chunk)
break
except:
# try again
pass
retry_times += 1
else:
try:
os.remove(file_path)
except OSError:
pass
print("Failed to retrieve %s from %s." % (file_path,
download_url))
class TumblrCrawler(object):
def __init__(self, site):
self.site = site
self.queue = Queue.Queue()
self._register_regex_match_rules()
self.scheduling()
def _register_regex_match_rules(self):
self.regex_rules = [video_hd_match(), video_default_match()]
def scheduling(self):
for x in range(THREADS):
worker = DownloadWorker(self.queue)
worker.daemon = True
worker.start()
self.download_media(site)
self.queue.join()
def download_media(self, site):
base_url = "http://{0}.tumblr.com/api/read?num={1}&start={2}"
start = START
while True:
media_url = base_url.format(site, NUM, start)
response = requests.get(media_url)
if response.status_code == 404:
print("Site %s does not exist" % site)
break
try:
tree = ET.fromstring(response.content)
post_tags = tree.findall('./posts/post')
if not len(post_tags):
break
for post_tag in post_tags:
post_id = post_tag.attrib['id']
post_type = post_tag.attrib['type']
post_date = post_tag.attrib['date-gmt'].split(" ")[0]
post_year, post_month, post_day = re.search(r'^(....)-(..)-(..)$', post_date).groups()
if post_type == 'video' and post_tag.find('.//video-source') is not None:
for video_player_tag in post_tag.findall('./video-player'):
if not 'max-width' in video_player_tag.attrib and video_player_tag.text is not None:
for regex_rule in self.regex_rules:
video_match = regex_rule(video_player_tag.text)
if video_match is not None:
try:
file_name = "%s_%s_%s.%s" % (site, post_date, post_id, post_tag.findall('./video-source/extension')[0].text)
self.queue.put((video_match, file_name))
except(IndexError):
pass
break
if video_match is None:
print("Could not find video url for post: %s" % post_id)
if post_type == 'photo':
if not len(post_tag.findall('./photoset'))!=0:
for photo_url_tag in post_tag.findall('./photo-url'):
if photo_url_tag.attrib['max-width'] == "1280":
photo_url = photo_url_tag.text
file_name = "%s_%s_%s%s" % (site, post_date, post_id, os.path.splitext(photo_url)[1])
self.queue.put((photo_url, file_name))
else:
for photo_tag in post_tag.findall('./photoset/photo'):
for photo_url_tag in photo_tag.findall('./photo-url'):
if photo_url_tag.attrib['max-width'] == "1280":
photo_url = photo_url_tag.text
photo_offset = re.search(r'^o(.*)$', photo_tag.attrib['offset']).groups()[0]
file_name = "%s_%s_%s_%s%s" % (site, post_date, post_id, photo_offset, os.path.splitext(photo_url)[1])
self.queue.put((photo_url, file_name))
start += NUM
except(ET.ParseError):
self.queue.put((None,None))
print("ET.ParseError")
break
if __name__ == "__main__":
site = None
if len(sys.argv) < 2:
print >> sys.stderr, "Pass tumblr name as argument"
sys.exit()
site = sys.argv[1]
TumblrCrawler(site)
print("Finished!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment