Skip to content

Instantly share code, notes, and snippets.

@fulcrum6378
Last active January 27, 2024 23:47
Show Gist options
  • Save fulcrum6378/87ec6ca1caab8a5aa26d4d730a9a1128 to your computer and use it in GitHub Desktop.
Save fulcrum6378/87ec6ca1caab8a5aa26d4d730a9a1128 to your computer and use it in GitHub Desktop.
DeviantArt Batch Downloader - command-line tool
import html
import json
import os.path
import sys
from time import sleep
from typing import Dict, Optional
import requests
import requests.adapters
from requests.exceptions import ChunkedEncodingError, ConnectionError, ProxyError, SSLError
if len(sys.argv) <= 1:
print("""
Action types:
To download all arts of someone's profile: gallery {PROFILE_NAME} [PARAMS]
To download a certain gallery: gallery {PROFILE_NAME} {GALLERY_ID} [PARAMS]
To search for something in a profile: search {PROFILE_NAME} {QUERY} [PARAMS]
To search for something in a profile: search all {QUERY} [PARAMS]
To download a favourite folder: fav {PROFILE_NAME} {FOLDER_ID} [PARAMS]
Parameters:
Whether to ask for each download (def 1): ask=<0|1>
Beginning offset (def 0): offset=<+INT> (won't work in global search)
Lazy loading limit (def 24): limit=<1..60> (won't work in global search)
Delay time (seconds) between each download (def 1): delay=<+DECIMAL>
Original, full-view or preview quality (def org): quality=<org|ful|pre>
To use a proxy: proxy=<STR> (e.g. http://127.0.0.1:8580)
To replace existing files (def 1): replace=<0|1>
To create separate folders for each profile: separate=<0|1> (defaults to 1 in gallery, 0 elsewhere)
Answering the questions about each post:
Exit the app: quit
Skip the current lazily loaded posts: skip
Download all the lazily loaded posts: yes
Download the current item: y
Download the current item in preview: p
Download the current item in full view: f
Download the current item in original view: o
Skip the current item: <anything-else/nothing>
@ Optionally you can put your login session HTTP headers in "./headers.json"
in order to view private or exclusive content; although if you wish to download
the original media in some cases you'll be required to provide the security headers.
""")
quit()
# TODO:
# - Cannot find 'prettyName' for GIFs.
def load_headers():
global headers
first = os.path.isfile('headers_deviantart.json')
second = os.path.isfile('headers.json')
if first or second:
if first:
hf = 'headers_deviant_art.json'
else:
hf = 'headers.json'
headers = json.loads(open(hf, 'r').read())
else:
print('WARNING: Missing \"header.json\" file!!!')
lazy = '&offset=<OFFSET>&limit=<LIMIT>'
pattern, username, kvParams, da_browse_api = '', sys.argv[2].lower(), list(), False
if sys.argv[1] == 'gallery':
pattern = 'https://www.deviantart.com/_napi/shared_api/gallection/contents?username=' + \
username + '&type=gallery' + lazy + '&csrf_token=<CSRF>'
if len(sys.argv) <= 3 or '=' in sys.argv[3]:
pattern += '&all_folder=true'
kvParams = sys.argv[3:]
else:
pattern += '&folderid=' + sys.argv[3] # is 46658611 always for FEATURED?
kvParams = sys.argv[4:]
elif sys.argv[1] == 'search':
if len(sys.argv) <= 3:
print('Missing arguments!')
quit()
if username != 'all':
pattern = 'https://www.deviantart.com/_puppy/dashared/gallection/search?username=' + \
username + '&type=gallery&order=most-recent&q=' + sys.argv[3] + '&init=true' + lazy + \
'&csrf_token=<CSRF>'
else:
pattern = 'https://www.deviantart.com/_puppy/da-browse/api/networkbar/search/deviations?q=' + \
sys.argv[3] + '&cursor=<CURSOR>' + '&csrf_token=<CSRF>'
da_browse_api = True
kvParams = sys.argv[4:]
elif sys.argv[1] == 'fav':
if len(sys.argv) <= 3:
print('Missing arguments!')
quit()
pattern = 'https://www.deviantart.com/_puppy/dashared/gallection/contents' + \
'?username=' + username + '&type=collection' + lazy + '&folderid=' + sys.argv[3] + '&csrf_token=<CSRF>'
kvParams = sys.argv[4:]
else:
print('Unknown command', sys.argv[1])
quit()
params = {
"ask": "1",
"offset": "0",
"limit": "24",
"delay": "1",
"quality": "org",
"proxy": "", # http://127.0.0.1:8580
"replace": "1",
"separate": "1" if sys.argv[1] == 'gallery' else "0",
}
for kv in kvParams:
s = kv.split("=")
params[s[0]] = s[1]
proxies = {'https': params['proxy']}
headers = None
load_headers()
find = lambda my_list, b: [x for x in my_list if b(x)][0]
qua = {'p': 'pre', 'f': 'ful', 'o': 'org'}
qua_keys = list(qua.keys())
html_text: Optional[str] = None
requests.adapters.DEFAULT_RETRIES = 6
delay_before_retry = 3
download_link_starter = 'https://www.deviantart.com/download/'
downloads_folder = 'Downloads'
# Get CSRF token and if necessary the userId
page: Optional[str] = None
while page is None:
try:
page = requests.get("https://www.deviantart.com/" + username, headers=headers, proxies=proxies).text
except (ConnectionError, ProxyError, SSLError):
print("Retrying for CSRF...")
sleep(delay_before_retry)
page = page[page.index("window.__BASEURL__"):]
page = page[0:page.index("</script>")]
befCsrf, aftCsrf = "window.__CSRF_TOKEN__ = \'", "\';"
if befCsrf not in page:
print("Couldn\'t find the crucial CSRF token!")
quit()
csrf = page[page.index(befCsrf) + len(befCsrf):]
csrf = csrf[0:csrf.index(aftCsrf)]
pattern = pattern.replace("<CSRF>", csrf)
# noinspection PyShadowingNames
def fetch(url: str, offset: int, next_cursor: str = ''):
""" `offset` is for the Shared API and `next_cursor` for the DA-Browse API. """
global headers
api_res = None
while api_res is None:
try:
api_res = requests.get(
url.replace('<OFFSET>', str(offset)).replace('<CURSOR>', next_cursor),
headers=headers, proxies=proxies).text
except (ConnectionError, ProxyError, SSLError):
print('Retrying for API...')
sleep(delay_before_retry)
data: Dict = json.loads(api_res)
del api_res
if 'error' in data:
print(url.replace('<OFFSET>', str(offset)).replace('<CURSOR>', next_cursor))
print(json.dumps(data))
return
if 'results' not in data and 'deviations' not in data: # or 'error' in data
print("NOTHING FOUND!")
return
arr = data['results'] if 'results' in data else data['deviations']
print("Fetched " + str(len(arr)) + " items.")
skip_yes = False
iArt = offset - 1
for art in arr:
iArt += 1
deviation = art if 'deviation' not in art else art['deviation']
branch = deviation['author']['username'] if params['separate'] == '1' else downloads_folder
if not os.path.isdir(branch):
os.mkdir(branch)
media: Dict = deviation['media']
file_type = media['baseUri'].split(".")
file_type = "." + file_type[len(file_type) - 1]
file_path = os.path.join(branch, media['prettyName'] + file_type)
already_exists = os.path.isfile(file_path)
if already_exists and params['replace'] == '0': # and params['quality'] != 'org'
print('SKIPPED', media['prettyName'])
continue
quality = params['quality']
if params["ask"] == "1" and not skip_yes:
title_suffix = ''
if already_exists:
title_suffix = ' -- !ALREADY DOWNLOADED!'
ans = input('[' + str(iArt) + ']: ' + deviation['title'] + title_suffix + '\n' + deviation['url'] + '\n')
if ans == "quit":
quit()
elif ans == "skip":
break
if ans not in ['y', 'yes'] + qua_keys:
continue
if ans == "yes":
skip_yes = True
if ans in qua_keys:
quality = qua[ans]
if quality == 'org' and deviation['isDownloadable']:
global html_text
find_download_button(deviation)
while download_link_starter not in html_text:
cmd = input("""
!!!YOU NEED TO LOGIN TO CONTINUE!!!
Please update the "headers.json" file,
and then press ENTER to continue...
""")
if cmd == "quit":
quit()
load_headers()
find_download_button(deviation)
download = html.unescape(
download_link_starter + html_text.split(download_link_starter)[1].split('\"')[0])
file_type = download.split('?')[0].split('.')
file_type = "." + file_type[len(file_type) - 1]
file_path = os.path.join(branch, media['prettyName'] + file_type)
if os.path.isfile(file_path) and params['replace'] == '0':
print("SKIPPED", media['prettyName'])
continue
elif quality == 'ful' or (quality == 'org' and not deviation['isDownloadable']):
full_view = find(media['types'], lambda x: x['t'] == 'fullview')
if 'c' in full_view:
download = media['baseUri'] + full_view['c'].replace('<prettyName>', media['prettyName'])
if 'token' in media: download += "?token=" + media['token'][0]
# if 'c' is already in 'fullview' and you acquire the raw 'baseUri', it'll give you 403 error!
else:
download = media['baseUri']
if 'token' in media: download += "?token=" + media['token'][0]
else:
# all the other sources are just thumbnails, avoid them all.
# mostly 'social_preview' and 'preview' are equal; but sometimes social preview
# refers to https://st.deviantart.net/misc/noentrythumb-200.png with 200x200 dimensions.
pre_view = find(media['types'], lambda x: x['t'] == 'preview')
if 'c' in pre_view:
src = pre_view['c']
else:
print("COULD NOT FIND A PROPER URL FOR", deviation['url'])
continue
download = media['baseUri'] + src.replace('<prettyName>', media['prettyName'])
if 'token' in media: download += "?token=" + media['token'][0]
binary = None
while binary is None:
try:
binary = requests.get(
download, headers=headers, proxies=proxies, allow_redirects=True, timeout=180).content
except (ConnectionError, ProxyError, SSLError, ChunkedEncodingError):
print("Retrying for the image binary...")
sleep(delay_before_retry)
open(file_path, 'wb').write(binary)
del binary
print("Downloaded", media['prettyName'] + file_type)
if params["ask"] == "0" and not skip_yes:
sleep(float(params["delay"]))
if data['hasMore']:
fetch(url, data['nextOffset'] if not da_browse_api else iArt,
data['nextCursor'] if 'nextCursor' in data else '')
else:
print("END OF LIST.")
def find_download_button(deviation: Dict):
global headers, html_text
got_it = False
while not got_it:
try:
html_text = requests.get(deviation['url'], headers=headers, proxies=proxies).text
got_it = True
except (ConnectionError, ProxyError, SSLError):
print("Retrying for the download button...")
sleep(delay_before_retry)
fetch(pattern.replace("<LIMIT>", params["limit"]), int(params["offset"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment