Skip to content

Instantly share code, notes, and snippets.

@thiagomgd

thiagomgd/README.md

Last active May 2, 2021
Embed
What would you like to do?
Reddit Downloader with search

Before running:

  • Install 'praw' dependency (pip3 install praw)
  • Register for reddit and imgur API application. (IMGUR only necessary for downloading an album)
  • Replace those IDs and secrets on config

On the config:

  • desired_files: filetypes you want to download
  • wanted_links: domains which links that are going to be saved to an output file

To run on command line, it takes 2 arguments: search term and (optional) limit. If not specified, limit is 24.

Example: python3 gist_reddit_downloader_search.py 'Boku No Hero Season 5' 12

Because the download script was targeted for anime discussions, subreddit and flair are fixed in the script.

import os
import json
import praw
import re
import requests
import string
import sys
import time
from urllib import parse
config = {
"client_id": "REDDIT_CLIENT_ID",
"client_secret": "REDDIT_CLIENT_SECRET",
"imgur_client_id": "IMGUR_CLIENT_ID",
"desired_files": (
".gif",
".gifv",
".webm",
".jpeg",
".jpg",
".png",
".mp4"
),
"wanted_links": (
"gyazo",
"youtube",
"myanimelist",
"wikipedia",
"youtu.be",
"twitter"
)
}
ignored = {}
exceptions = {}
errors_file = None
saved_links = None
print_output = None
post = {}
user_agent = "Reddit Downloader 0.1"
def my_print(text):
print_output.write("{}\n".format(text))
def parse_url(url):
return(parse.urlparse(url))
def valid_filename(filename):
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
return ''.join(c for c in filename if c in valid_chars)
def format_filename(folder, title, file, extension='', separator=' - '):
return folder + valid_filename(title[:100] + separator + file + extension)
def download_file(filename, url, service, id, title=None):
try:
if not os.path.isfile(filename):
os.makedirs(os.path.dirname(filename), exist_ok=True)
r = requests.get(url, timeout=30)
with open(filename, 'wb') as outfile:
outfile.write(r.content)
except Exception as e:
errors_file.write("{} | {} | {}\n".format(url, filename, e))
def save_link(folder, link):
text = "{}-{} | {} - {}\n"
saved_links.write(text.format(post["id"], post["title"], link["title"], link["url"]))
def save_exception(link):
text = "Exception: {}-{} | {} - {}\n"
saved_links.write(text.format(post["id"], post["title"], link["title"], link["url"]))
def ignore_link(folder, link):
domain = parse_url(link["url"]).netloc
if domain not in ignored:
exceptions[domain] = []
exceptions[domain].append(link)
def get_service_and_id(link):
parts = link.split('/')
service = parts[2]
id = parts[-1]
return service, id
def download_link(folder, link, service=None, id=None):
if not service:
service, id = get_service_and_id(link["url"])
url = link["url"]
fn = format_filename(folder, link["title"], url[url.rfind('/')+1:])
download_file(fn, url, service, id, link["title"])
def special_imgur_album(folder, link):
url = link["url"]
rf = url.rfind('#')
if rf > 0:
url = url[:url.rfind('#')]
albumname = url[url.rfind('/')+1:]
down_url = 'https://api.imgur.com/3/album/{}/images?client_id={}'.format(albumname, config["imgur_client_id"])
r = requests.get(down_url)
try:
api_data = json.loads(r.content.decode("utf-8"))
data = api_data["data"]
df = format_filename(folder, link["title"], albumname)
df = df + "/"
i = 1
if 'error' in data:
errors_file.write("{} | {} | {}\n".format("special_imgur_album", link["url"], data['error']))
return
for img in data:
ttl = str(i)+" - "+(img.get("title") or img["id"])
download_link(df, {'url': img["link"], 'title': ttl}, 'imgur', img["id"])
i = i+1
except Exception as inst:
print("ERROR::: ", albumname)
print(inst)
def special_imgur_image(folder, link):
url = link["url"]
fn = url[url.rfind('/')+1:]
url = url[:url.rfind('/')+1] + "download/" + fn
# TODO: change to download_file()
r = requests.get(url)
if "Content-Type" in r.headers:
try:
ft = r.headers["Content-Type"]
fn = fn + "." + ft[ft.rfind('/')+1:]
fn = format_filename(folder, link["title"], fn)
if not os.path.isfile(fn):
os.makedirs(os.path.dirname(fn), exist_ok=True)
with open(fn, 'wb') as outfile:
outfile.write(r.content)
except Exception as e:
errors_file.write("{} | {}\n".format(url, e))
else:
errors_file.write("{} | {}\n".format("IGNORED: ", url))
ignore_link(folder, link)
def special_imgur_gifv(folder, link):
url = link["url"]
gif_url = url.replace("gifv", "gif")
url = url.replace("gifv", "mp4")
my_print(" SKIPPING IMGUR GIFV {} {}".format(url[url.rfind('/')+1:], link["url"]))
save_link(folder, link)
def special_gfycat(folder, link):
gfycat_api_url = 'https://api.gfycat.com/v1/gfycats/{}'
name = link["url"]
name = name[name.rfind('/')+1:]
api_request = requests.get(gfycat_api_url.format(name))
api_data = json.loads(api_request.content.decode("utf-8"))
if api_data and "gfyItem" in api_data:
url = api_data["gfyItem"]["webmUrl"]
fn = format_filename(folder, link["title"], name, '.webm')
# TODO: change to download_link
download_file(fn, url, 'gfycat', name, link["title"])
else:
errors_file.write("{} | {} | {}\n".format("GFYCAT ", link, api_data))
def special_streamable(folder, link):
apiurl = 'https://api.streamable.com/videos/{}'
name = link["url"]
name = name[name.rfind('/')+1:]
api_request = requests.get(apiurl.format(name))
try:
my_print(link["url"])
api_data = json.loads(api_request.content.decode("utf-8"))
if api_data and "files" in api_data:
data = api_data["files"]
if "mp4" in data:
url = data["mp4"]["url"]
url = "https://"+url[2:] # removes '//' at beginning
fn = format_filename(folder, link["title"], name, '.mp4')
download_file(fn, url, 'streamable', name, link["title"])
else:
errors_file.write("{} | {}\n".format("streamable", api_data))
except:
errors_file.write("{} | {}\n".format("streamable", api_request.content))
actions = {
"download": download_link,
"special_imgur_album": special_imgur_album,
"special_imgur_image": special_imgur_image,
"special_imgur_gifv": special_imgur_gifv,
"special_gfycat": special_gfycat,
"special_streamable": special_streamable,
"save": save_link,
"ignore": ignore_link,
}
def get_links(comment):
# Links without title are not processed for now :(
# Anything that isn't a square closing bracket
name_regex = "[^]]+"
# http:// or https:// followed by anything but a closing parentheses
url_regex = "http[s]?://[^)]+"
markup_regex = "\[({0})]\(\s*({1})\s*\)".format(name_regex, url_regex)
ret = re.findall(markup_regex, comment.body)
links = []
for itm in ret:
links.append({"title": itm[0], "url": itm[1]})
return links
def check_link_action(url):
if url.endswith(".gifv"):
return "special_imgur_gifv"
# elif any(x in url for x in is_file):
elif url.endswith(config["desired_files"]):
return "download"
elif "imgur.com/a/" in url:
return "special_imgur_album"
elif "imgur.com/gallery/" in url:
return "special_imgur_album"
elif "imgur" in url:
return "special_imgur_image"
elif "gfycat" in url:
return "special_gfycat"
elif "streamable" in url:
return "special_streamable"
elif any(x in url for x in config["wanted_links"]):
return "save"
else:
return "ignore"
def download_links(folder, links):
for link in links:
link_action = check_link_action(link["url"])
my_print(link["url"])
actions[link_action](folder, link)
def format_comment_dict(c):
return {
"author": c.author.name if c.author else "",
"body": c.body,
"controversiality": c.controversiality,
"depth": c.depth,
"gilded": c.gilded,
"id": c.id,
"permalink": "https://reddit.com" + c.permalink,
"score": c.score
}
def download_posts_media(reddit, submission_list):
global ignored, exceptions, errors_file, saved_links, print_output, post
folder = "downloaded/"
if not os.path.exists(folder):
os.makedirs(folder)
errors_file = open("{}error_logs.txt".format(folder), "a+", encoding="utf-8")
saved_links = open("{}saved_links.txt".format(folder), "a+", encoding="utf-8")
print_output = open("{}output.txt".format(folder), "a+", encoding="utf-8")
if not isinstance(submission_list, list):
submission_list = [submission_list]
for idx, submission_id in enumerate(submission_list, start=1):
print('Downloading post {}'.format(idx))
comments = []
ignored = {}
exceptions = {}
submission = reddit.submission(id=submission_id)
my_print('Getting submission {} {}'.format(submission_id, submission.title))
download_folder = "{}{}/".format(folder, valid_filename(submission.subreddit.display_name))
comments_folder = "{}[{}]-{}/".format(download_folder, submission.id, submission.title)
post = {"url": submission.url, "title": submission.title, "id": submission.id}
if not os.path.exists(comments_folder):
os.makedirs(comments_folder)
submission.comments.replace_more(limit=None)
comment_queue = submission.comments[:] # Seed with top-level
i = 0
while comment_queue:
i = i+1
comment = comment_queue.pop(0)
comments.append(format_comment_dict(comment))
url_list = get_links(comment)
download_links(comments_folder, url_list)
comment_queue.extend(comment.replies)
errors_file.flush()
saved_links.flush()
print_output.flush()
os.fsync(errors_file.fileno())
os.fsync(saved_links.fileno())
os.fsync(print_output.fileno())
time.sleep(0.1)
errors_file.close()
saved_links.close()
print_output.close()
def search_posts(reddit, query='', subreddit='anime', sort='new', t='year', flair='Episode', l=None):
if flair and flair != '':
query = ' '.join((query, 'flair_name:"{}"'.format(flair)))
sr = reddit.subreddit(subreddit)
post_list = sr.search(query, sort=sort, syntax='lucene',
time_filter=t, limit=l)
posts = [post.id for post in post_list]
return posts
reddit = praw.Reddit(
user_agent=user_agent, client_id=config["client_id"], client_secret=config["client_secret"])
search_term = sys.argv[1]
limit = int(sys.argv[2]) if len(sys.argv) >= 3 else 24
posts = search_posts(reddit, search_term, l=limit)
print('{} posts found'.format(len(posts)))
download_posts_media(reddit, posts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment