Last active
January 27, 2024 23:47
-
-
Save fulcrum6378/87ec6ca1caab8a5aa26d4d730a9a1128 to your computer and use it in GitHub Desktop.
DeviantArt Batch Downloader - command-line tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import html | |
import json | |
import os.path | |
import sys | |
from time import sleep | |
from typing import Dict, Optional | |
import requests | |
import requests.adapters | |
from requests.exceptions import ChunkedEncodingError, ConnectionError, ProxyError, SSLError | |
if len(sys.argv) <= 1: | |
print(""" | |
Action types: | |
To download all arts of someone's profile: gallery {PROFILE_NAME} [PARAMS] | |
To download a certain gallery: gallery {PROFILE_NAME} {GALLERY_ID} [PARAMS] | |
To search for something in a profile: search {PROFILE_NAME} {QUERY} [PARAMS] | |
To search for something in a profile: search all {QUERY} [PARAMS] | |
To download a favourite folder: fav {PROFILE_NAME} {FOLDER_ID} [PARAMS] | |
Parameters: | |
Whether to ask for each download (def 1): ask=<0|1> | |
Beginning offset (def 0): offset=<+INT> (won't work in global search) | |
Lazy loading limit (def 24): limit=<1..60> (won't work in global search) | |
Delay time (seconds) between each download (def 1): delay=<+DECIMAL> | |
Original, full-view or preview quality (def org): quality=<org|ful|pre> | |
To use a proxy: proxy=<STR> (e.g. http://127.0.0.1:8580) | |
To replace existing files (def 1): replace=<0|1> | |
To create separate folders for each profile: separate=<0|1> (defaults to 1 in gallery, 0 elsewhere) | |
Answering the questions about each post: | |
Exit the app: quit | |
Skip the current lazily loaded posts: skip | |
Download all the lazily loaded posts: yes | |
Download the current item: y | |
Download the current item in preview: p | |
Download the current item in full view: f | |
Download the current item in original view: o | |
Skip the current item: <anything-else/nothing> | |
@ Optionally you can put your login session HTTP headers in "./headers.json" | |
in order to view private or exclusive content; although if you wish to download | |
the original media in some cases you'll be required to provide the security headers. | |
""") | |
quit() | |
# TODO: | |
# - Cannot find 'prettyName' for GIFs. | |
def load_headers(): | |
global headers | |
first = os.path.isfile('headers_deviantart.json') | |
second = os.path.isfile('headers.json') | |
if first or second: | |
if first: | |
hf = 'headers_deviant_art.json' | |
else: | |
hf = 'headers.json' | |
headers = json.loads(open(hf, 'r').read()) | |
else: | |
print('WARNING: Missing \"header.json\" file!!!') | |
lazy = '&offset=<OFFSET>&limit=<LIMIT>' | |
pattern, username, kvParams, da_browse_api = '', sys.argv[2].lower(), list(), False | |
if sys.argv[1] == 'gallery': | |
pattern = 'https://www.deviantart.com/_napi/shared_api/gallection/contents?username=' + \ | |
username + '&type=gallery' + lazy + '&csrf_token=<CSRF>' | |
if len(sys.argv) <= 3 or '=' in sys.argv[3]: | |
pattern += '&all_folder=true' | |
kvParams = sys.argv[3:] | |
else: | |
pattern += '&folderid=' + sys.argv[3] # is 46658611 always for FEATURED? | |
kvParams = sys.argv[4:] | |
elif sys.argv[1] == 'search': | |
if len(sys.argv) <= 3: | |
print('Missing arguments!') | |
quit() | |
if username != 'all': | |
pattern = 'https://www.deviantart.com/_puppy/dashared/gallection/search?username=' + \ | |
username + '&type=gallery&order=most-recent&q=' + sys.argv[3] + '&init=true' + lazy + \ | |
'&csrf_token=<CSRF>' | |
else: | |
pattern = 'https://www.deviantart.com/_puppy/da-browse/api/networkbar/search/deviations?q=' + \ | |
sys.argv[3] + '&cursor=<CURSOR>' + '&csrf_token=<CSRF>' | |
da_browse_api = True | |
kvParams = sys.argv[4:] | |
elif sys.argv[1] == 'fav': | |
if len(sys.argv) <= 3: | |
print('Missing arguments!') | |
quit() | |
pattern = 'https://www.deviantart.com/_puppy/dashared/gallection/contents' + \ | |
'?username=' + username + '&type=collection' + lazy + '&folderid=' + sys.argv[3] + '&csrf_token=<CSRF>' | |
kvParams = sys.argv[4:] | |
else: | |
print('Unknown command', sys.argv[1]) | |
quit() | |
params = { | |
"ask": "1", | |
"offset": "0", | |
"limit": "24", | |
"delay": "1", | |
"quality": "org", | |
"proxy": "", # http://127.0.0.1:8580 | |
"replace": "1", | |
"separate": "1" if sys.argv[1] == 'gallery' else "0", | |
} | |
for kv in kvParams: | |
s = kv.split("=") | |
params[s[0]] = s[1] | |
proxies = {'https': params['proxy']} | |
headers = None | |
load_headers() | |
find = lambda my_list, b: [x for x in my_list if b(x)][0] | |
qua = {'p': 'pre', 'f': 'ful', 'o': 'org'} | |
qua_keys = list(qua.keys()) | |
html_text: Optional[str] = None | |
requests.adapters.DEFAULT_RETRIES = 6 | |
delay_before_retry = 3 | |
download_link_starter = 'https://www.deviantart.com/download/' | |
downloads_folder = 'Downloads' | |
# Get CSRF token and if necessary the userId | |
page: Optional[str] = None | |
while page is None: | |
try: | |
page = requests.get("https://www.deviantart.com/" + username, headers=headers, proxies=proxies).text | |
except (ConnectionError, ProxyError, SSLError): | |
print("Retrying for CSRF...") | |
sleep(delay_before_retry) | |
page = page[page.index("window.__BASEURL__"):] | |
page = page[0:page.index("</script>")] | |
befCsrf, aftCsrf = "window.__CSRF_TOKEN__ = \'", "\';" | |
if befCsrf not in page: | |
print("Couldn\'t find the crucial CSRF token!") | |
quit() | |
csrf = page[page.index(befCsrf) + len(befCsrf):] | |
csrf = csrf[0:csrf.index(aftCsrf)] | |
pattern = pattern.replace("<CSRF>", csrf) | |
# noinspection PyShadowingNames | |
def fetch(url: str, offset: int, next_cursor: str = ''): | |
""" `offset` is for the Shared API and `next_cursor` for the DA-Browse API. """ | |
global headers | |
api_res = None | |
while api_res is None: | |
try: | |
api_res = requests.get( | |
url.replace('<OFFSET>', str(offset)).replace('<CURSOR>', next_cursor), | |
headers=headers, proxies=proxies).text | |
except (ConnectionError, ProxyError, SSLError): | |
print('Retrying for API...') | |
sleep(delay_before_retry) | |
data: Dict = json.loads(api_res) | |
del api_res | |
if 'error' in data: | |
print(url.replace('<OFFSET>', str(offset)).replace('<CURSOR>', next_cursor)) | |
print(json.dumps(data)) | |
return | |
if 'results' not in data and 'deviations' not in data: # or 'error' in data | |
print("NOTHING FOUND!") | |
return | |
arr = data['results'] if 'results' in data else data['deviations'] | |
print("Fetched " + str(len(arr)) + " items.") | |
skip_yes = False | |
iArt = offset - 1 | |
for art in arr: | |
iArt += 1 | |
deviation = art if 'deviation' not in art else art['deviation'] | |
branch = deviation['author']['username'] if params['separate'] == '1' else downloads_folder | |
if not os.path.isdir(branch): | |
os.mkdir(branch) | |
media: Dict = deviation['media'] | |
file_type = media['baseUri'].split(".") | |
file_type = "." + file_type[len(file_type) - 1] | |
file_path = os.path.join(branch, media['prettyName'] + file_type) | |
already_exists = os.path.isfile(file_path) | |
if already_exists and params['replace'] == '0': # and params['quality'] != 'org' | |
print('SKIPPED', media['prettyName']) | |
continue | |
quality = params['quality'] | |
if params["ask"] == "1" and not skip_yes: | |
title_suffix = '' | |
if already_exists: | |
title_suffix = ' -- !ALREADY DOWNLOADED!' | |
ans = input('[' + str(iArt) + ']: ' + deviation['title'] + title_suffix + '\n' + deviation['url'] + '\n') | |
if ans == "quit": | |
quit() | |
elif ans == "skip": | |
break | |
if ans not in ['y', 'yes'] + qua_keys: | |
continue | |
if ans == "yes": | |
skip_yes = True | |
if ans in qua_keys: | |
quality = qua[ans] | |
if quality == 'org' and deviation['isDownloadable']: | |
global html_text | |
find_download_button(deviation) | |
while download_link_starter not in html_text: | |
cmd = input(""" | |
!!!YOU NEED TO LOGIN TO CONTINUE!!! | |
Please update the "headers.json" file, | |
and then press ENTER to continue... | |
""") | |
if cmd == "quit": | |
quit() | |
load_headers() | |
find_download_button(deviation) | |
download = html.unescape( | |
download_link_starter + html_text.split(download_link_starter)[1].split('\"')[0]) | |
file_type = download.split('?')[0].split('.') | |
file_type = "." + file_type[len(file_type) - 1] | |
file_path = os.path.join(branch, media['prettyName'] + file_type) | |
if os.path.isfile(file_path) and params['replace'] == '0': | |
print("SKIPPED", media['prettyName']) | |
continue | |
elif quality == 'ful' or (quality == 'org' and not deviation['isDownloadable']): | |
full_view = find(media['types'], lambda x: x['t'] == 'fullview') | |
if 'c' in full_view: | |
download = media['baseUri'] + full_view['c'].replace('<prettyName>', media['prettyName']) | |
if 'token' in media: download += "?token=" + media['token'][0] | |
# if 'c' is already in 'fullview' and you acquire the raw 'baseUri', it'll give you 403 error! | |
else: | |
download = media['baseUri'] | |
if 'token' in media: download += "?token=" + media['token'][0] | |
else: | |
# all the other sources are just thumbnails, avoid them all. | |
# mostly 'social_preview' and 'preview' are equal; but sometimes social preview | |
# refers to https://st.deviantart.net/misc/noentrythumb-200.png with 200x200 dimensions. | |
pre_view = find(media['types'], lambda x: x['t'] == 'preview') | |
if 'c' in pre_view: | |
src = pre_view['c'] | |
else: | |
print("COULD NOT FIND A PROPER URL FOR", deviation['url']) | |
continue | |
download = media['baseUri'] + src.replace('<prettyName>', media['prettyName']) | |
if 'token' in media: download += "?token=" + media['token'][0] | |
binary = None | |
while binary is None: | |
try: | |
binary = requests.get( | |
download, headers=headers, proxies=proxies, allow_redirects=True, timeout=180).content | |
except (ConnectionError, ProxyError, SSLError, ChunkedEncodingError): | |
print("Retrying for the image binary...") | |
sleep(delay_before_retry) | |
open(file_path, 'wb').write(binary) | |
del binary | |
print("Downloaded", media['prettyName'] + file_type) | |
if params["ask"] == "0" and not skip_yes: | |
sleep(float(params["delay"])) | |
if data['hasMore']: | |
fetch(url, data['nextOffset'] if not da_browse_api else iArt, | |
data['nextCursor'] if 'nextCursor' in data else '') | |
else: | |
print("END OF LIST.") | |
def find_download_button(deviation: Dict): | |
global headers, html_text | |
got_it = False | |
while not got_it: | |
try: | |
html_text = requests.get(deviation['url'], headers=headers, proxies=proxies).text | |
got_it = True | |
except (ConnectionError, ProxyError, SSLError): | |
print("Retrying for the download button...") | |
sleep(delay_before_retry) | |
fetch(pattern.replace("<LIMIT>", params["limit"]), int(params["offset"])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment