Skip to content

Instantly share code, notes, and snippets.

@sbusso
Forked from un1tz3r0/youtubemusicdownloader.py
Created August 19, 2023 22:46
Show Gist options
  • Save sbusso/2c2062e2547dcb7dec85282b28ce3d0d to your computer and use it in GitHub Desktop.
Save sbusso/2c2062e2547dcb7dec85282b28ce3d0d to your computer and use it in GitHub Desktop.
This script uses ytmusicapi and pytube together to download your playlists, history or 'liked' songs as high-quality audio-only streams from Youtube Music.
''' This script uses ytmusicapi and pytube together to download your playlists, history or 'liked' songs as
high-quality audio-only streams from Youtube Music, which are protected by a "signatureCipher" obfuscation scheme.
To use it, first install [ytmusicapi] and [pytube] using pip, then follow the instructions for creating the auth
file from the response in an authenticated session to a watch-page request as found in your browser's dev-tools.
The downloaded files are placed in ~/Music, named with the artist and track metadata, and will be skipped instead
of downloaded again next time it is run, based on the videoIds of the downloaded songs.
Merry Xmas - V.
'''
import ytmusicapi
import pytube
import re, os, sys
import requests
from urllib.parse import parse_qs, urljoin
import json
try:
import blessings
clear_eol = blessings.Terminal().clear_eol
except ImportError as err:
clear_eol = "\x1b[K"
class DownloaderMixin:
''' Mixin for ytmusicapi. YTMusic class that uses parts of pytube high-quality streaming and batch track and
playlist downloading the ytmusicapi. YTMusic class. Some examples of usage are given at the
end of the module after the rest of the class definition. '''
def get_streaming_data_decrypted(self, videoId: str) -> dict:
''' This is based on the YTMusic.get_streaming_data() method but it makes use of pytube to
decode the signatureCipher obfuscation that "protects" the higher quality adaptiveFormat
stream URLs from being enjoyed by "bots". Robots deserve access to teh same high-fidelity
listening experience that we humans take for granted every time we leave auto-play going
on the tv and then pass out sitting up on the couch, phone in hand, shoes still on, sleep-
shopping on Amazon. '''
# fetch /get_video_info? which should have a watch URL in there somewhere...
endpoint = "https://www.youtube.com/get_video_info"
params = {"video_id": videoId, "hl": self.language, "el": "detailpage",
"c": "WEB_REMIX", "cver": "0.1"}
response = requests.get(endpoint, params, headers=self.headers, proxies=self.proxies)
text = parse_qs(response.text)
if 'player_response' not in text:
# return text # huh?
raise Exception('This video is not playable (no player_response key in /get_video_info? response)')
player_response = json.loads(text['player_response'][0])
if 'streamingData' not in player_response:
raise Exception('This video is not playable (no streamingData key in player_response key of /get_video_info? response)')
watch_url = player_response['microformat']['microformatDataRenderer']['urlCanonical'] # this seems like it will probably break easily... maybe fall back to a recursive search for a watch url anywhere in the JSON? or something?
# get the watch page's HTML, which we need to get the base.js URL that determines how
# pytube unscrambles the signatureCipher
watch_response = requests.get(watch_url, #params,
headers=self.headers, proxies=self.proxies)
watch_html = watch_response.text
# this is where pytube comes in... given the watch page HTML, it extracts for us the URL of
# the base.js for the video player, which is where the signatureCipher is descrambled by a
# variable algorithm coded in minified, obfuscated javascript. thankfully, the task of
# extracting from the javascript the steps needed to properly unscramble the signatureCipher
# is also handled by pytube.
player_js_url = pytube.extract.get_ytplayer_js(watch_html)
player_js_response = requests.get(urljoin(watch_url, player_js_url), params, headers=self.headers, proxies=self.proxies)
player_js = player_js_response.text
cipher = pytube.cipher.Cipher(js = player_js)
# okay, now we collect all the streams available and apply the cipher to any that have signed
# URLs. this is where we would also handle DASH manifests... i think? TODO, fo' sho'.
allformats = []
sdata = player_response['streamingData']
for formatsKey in ['formats', 'adaptiveFormats']:
if formatsKey in sdata.keys():
for fmt in sdata[formatsKey]:
if 'signatureCipher' in fmt.keys():
fmtsigcipherq = parse_qs(fmt['signatureCipher'])
sig = cipher.get_signature(fmtsigcipherq['s'][0])
url = fmtsigcipherq['url'][0] + '&' + fmtsigcipherq['sp'][0] + '=' + sig
fmt['url'] = url
if not 'url' in fmt.keys():
print(f"[warn] streamingData contains format with itag {fmt['itag']} without a url key in get_streaming_data_decrypted({repr(videoId)}):\n\n{repr(fmt)}\n")
continue
allformats.append(fmt)
return (sdata, allformats)
def download_song(self, video_id: str, dest_dir: str, chunk_size: int = 1024*1024, overwrite: bool = False, keep_incomplete: bool = False):
song = self.get_song(video_id)
artists = None
if 'artists' in song.keys():
nonemptyuniqueartists = list(set([artist for artist in song['artists'] if len(artist) > 0]))
if len(nonemptyuniqueartists) > 0:
artists = ", ".join(list(set(song['artists'])))
if 'title' in song.keys():
title = song['title']
else:
title = None
if title != None and artists != None:
filename = artists + " - " + title + " [" + song['videoId'] + "]"
elif title != None:
filename = title + " [" + song['videoId'] + "]"
else:
filename = f"[{song['videoId']}]"
# maybe also download the thumbnail and possibly even set some tags (once we are finished downloading i guess...)
def sanitize(s):
return "".join(re.split("[^a-zA-Z 0-9_\\(\\)\\[\\]\\:\\'\\\"\\@\\!\\#\\$\\%\\&\\=\\+\\,\\.\\<\\>\\;\\|\\{\\}-]",s)).strip()
# pick from available streams one that is audio-only with the highest average bitrate, hence highest objective quality
try:
sdata, fmts = self.get_streaming_data_decrypted(song['videoId'])
audioonlyformats = [fmt for fmt in fmts if fmt['mimeType'].startswith('audio')]
if len(audioonlyformats) > 0:
bestfmt = list(sorted([(fmt['averageBitrate'], fmt) for fmt in fmts if fmt['mimeType'].startswith('audio')]))[-1][1]
else:
bestfmt = list(sorted([(fmt['averageBitrate'], fmt) for fmt in fmts]))[-1][1]
except Exception as err:
raise RuntimeError("Error selecting suitable streaming format: {err}")
fileext = bestfmt['mimeType'].split("/")[1].split(";")[0] # use sub-type from mimetype as file extension
fullfilename = dest_dir + "/" + sanitize(filename) + "." + fileext
if os.path.exists(fullfilename) and not overwrite:
print(f"Not downloading videoId {repr(song['videoId'])}, would overwrite file {repr(fullfilename)}...")
return False
print(f"Downloading videoId {repr(song['videoId'])} to file {repr(fullfilename)}...")
response = requests.get(bestfmt['url'], stream=True, headers=self.headers, proxies=self.proxies)
if 'content-length' in [k.lower() for k in response.headers.keys()]:
totalbytes = int([val for key,val in response.headers.items() if key.lower() == 'content-length'][0])
started = False
wrotebytes = 0
complete = False
try:
with open(fullfilename, "wb") as fout:
started = True
for chunk in response.iter_content(chunk_size=chunk_size):
fout.write(chunk)
wrotebytes = wrotebytes + len(chunk)
print(f"Downloaded {wrotebytes//1024} kbytes...{clear_eol}\r")
sys.stdout.flush()
complete = True
print(f"{clear_eol}\n")
sys.stdout.flush()
finally:
if started and not complete and not keep_incomplete:
print(f"Cleaning up partially downloaded file {repr(fullfilename)}...")
os.remove(fullfilename)
def download_playlist(self, playlist, dest_dir = "~/Music", limit_duration = 25*60, no_uploaded = True):
dest_dir = os.path.expanduser(dest_dir)
''' playlist may be specified in a few ways:
1. playlist id
2. return value of get_playlist() etc. (dict containing 'tracks' key with a list of dicts with 'videoId' keys)
3. list of dicts with videoId's
4. list of videoId strings
if given the result of a call to, e.g. get_playlist() or get_liked_songs(), the songs we
want are in a list under the 'tracks' key, assume we were passed either a list
of things that is directly enumerable and the elements of which each have a 'videoId',
for instance the dict returned by get_playlist() or get_liked_songs() etc. or a
'''
playlist_items = playlist
if isinstance(playlist_items, (str, bytes)):
# if playlist is a string, assume it is a playlist id and download the playlist
playlist_items = self.get_playlist(playlist_items)
elif hasattr(playlist_items, 'keys') and 'tracks' in playlist_items.keys():
# if playlist is not string-like but is dict-like (or at least, has a keys() method ;) and
# has a key 'tracks', assume it is a playlist data structure as returned by get_playlist()
playlist_items = playlist_items['tracks']
def parseDuration(s):
fields = s.split(":")
if len(fields) < 2:
return int(fields[0])
elif len(fields) < 3:
return int(fields[0]) + int(fields[1]) * 60
else:
return int(fields[-3])*60*60 + int(fields[-2])*60 + int(fields[-1])
def check_video_id(video_id, dest_dir):
''' determine if there is already a file in the destination directory with a specific
videoId in brackets in the name, in which case we will skip downloading it again unless overwrite=True '''
for fn in os.listdir(os.path.expanduser(dest_dir)):
fnparts = fn.split(os.path.extsep)
if any([fnpart.find('['+video_id+']') >= 0 for fnpart in fnparts]):
return True
return False
for listitem in list(playlist_items):
if (not 'videoId' in listitem.keys()):
raise KeyError("item in playlist_items does not have a videoId!")
if (not check_video_id(listitem['videoId'], dest_dir)) and ((not 'duration' in listitem.keys()) or (parseDuration(listitem['duration']) < 25*60)):
try:
ytm.download_song(listitem['videoId'], dest_dir)
except Exception as err:
print(f"Exception caught while trying to download videoId {listitem['videoId']} - {repr(listitem['title'])}: {err}")
else:
print(f"Skipping videoId {listitem['videoId']} - {repr(listitem['title'])} because a file ending with the same id exists.")
# Add the mixin to ytmusicapi class, creating our very own frankentype
class YTMusic(ytmusicapi.YTMusic, DownloaderMixin):
pass
# A simple example you can run from the cli:
if __name__ == "__main__":
usage = '''Missing file "headers_auth.json"... see ytmusicapi.readthedocs.org for explanation of how to use an
authenticated watch page request in a signed-in browser and the browser devtools to set up headers_auth.json for
ytmusicapi '''
if not os.path.exists("headers_auth.json"):
print(usage)
exit
ytm=YTMusic("headers_auth.json")
# EXAMPLE - download the last 10 songs in your playback history
history = ytm.download_playlist(ytm.get_history())
# EXAMPLE - download the most recent 1000 songs you liked
ytm.download_playlist(ytm.get_liked_songs(limit=1000))
''' If you were a Google Play Music fan like I was, then I feel your pain.
As though finding out that Play Music was being shut down weren't bad enough,
being given the option of a Takeout archive containing only uploaded tracks,
not any purchased content (even though it was downloadable through Play Music),
or transferring your music library over to Youtube Music (which admittedly does
bear a passing resemblance, at least cosmetically, to Play Music now) where you
can still stream it all you want... I wanted those purchased songs I could
buy and then download to make a mixtape in <your chosen DAW> or just listen to
now that I am too broke to be able to afford an internet connection. '''
@un1tz3r0
Copy link

Hey does this script still work for you? I wrote it ages ago and last time I tried to get it going I had a hell of a time trying to find the right cookies in the authenticated watch page's headers in my browser. Either way, glad you liked it, I spent a fair amount of time figuring out how to get the two libraries to work together to bypass the obnoxious amount of indirection that they employ. Security by obscurity.

@sbusso
Copy link
Author

sbusso commented Oct 26, 2023

I could not say, I used this a couple of months ago but YT changes its platform often so it is possible it is outdated.

@sbusso
Copy link
Author

sbusso commented Oct 26, 2023

@un1tz3r0 checking for what I did, I am not sure I got it working and ended up using other tools, but this was good code to play with

@un1tz3r0
Copy link

@sbusso thanks i have a ton of stuff like this kicking around. some day it will make an intersting archaeological find i'm sure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment