Created
February 3, 2016 22:50
-
-
Save KeizerDev/8e14540befe089631507 to your computer and use it in GitHub Desktop.
spotify2youtubedownload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
#title :spotify_downloader.py | |
#description :This script download music playing on spotify (Linux) from youtube. | |
#source :https://github.com/xfilipe/spotify-music-downloader | |
#author :xfilipe | |
#date :20160203 | |
#version :0.1.1 | |
#usage :python spotify_downloader.py | |
#python_version :2.7.6 | |
#============================================================================== | |
# Import the modules needed to run the script. | |
from __future__ import unicode_literals | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
from youtube_dl import YoutubeDL | |
import gobject, shutil, uuid | |
from os import makedirs | |
from os.path import expanduser, isfile, exists | |
import sys, traceback | |
from inspect import getmodule | |
from multiprocessing import Pool | |
import signal | |
from difflib import SequenceMatcher as SM | |
from operator import itemgetter | |
# configs | |
home = expanduser("~") | |
MUSIC_FOLDER_ABSOLUTE_PATH = home+'/Music' | |
class MyLogger(object): | |
def debug(self, msg): | |
pass | |
def warning(self, msg): | |
pass | |
def error(self, msg): | |
print(msg) | |
def yt_hook(d): | |
if d['status'] == 'downloading': | |
print '\r'+'Download:' + d['filename'].split('-')[-1][:-4], d['_percent_str'], d['_speed_str'], | |
sys.stdout.flush() | |
# decorator to send download function to background | |
def async(decorated): | |
module = getmodule(decorated) | |
decorated.__name__ += str('_original') | |
setattr(module, decorated.__name__, decorated) | |
def send(*args, **opts): | |
return async.pool.apply_async(decorated, args, opts) | |
return send | |
def spotify_grabber(metadata): | |
title = metadata['xesam:title'] | |
album = metadata['xesam:album'] | |
artist = metadata['xesam:artist'] | |
if isinstance(title, dbus.Array): | |
title = title[0] | |
if isinstance(album, dbus.Array): | |
album = album[0] | |
if isinstance(artist, dbus.Array): | |
artist = artist[0] | |
print '----- spotify info -----' | |
print 'Title: ' + title | |
print 'Artist: ' + artist | |
print 'Album: ' + album | |
print '------------------------' | |
return { | |
'title': str(title.encode('ascii','ignore')), | |
'album': str(album.encode('ascii','ignore')), | |
'artist': str(artist.encode('ascii','ignore')) | |
} | |
@async | |
def youtube_grabber(music_data): | |
query_url = 'https://www.youtube.com/results?search_query='+music_data.get('artist')+'+'+music_data.get('title')+'+%23music' | |
yt_opts = { | |
'logger': MyLogger() | |
} | |
with YoutubeDL(yt_opts) as ydl: | |
result = ydl.extract_info(query_url, download=False, process=False) | |
for entry in result['entries']: | |
if 'full album' in entry['title'].lower(): | |
entry['ratio'] = 0.01 | |
else: | |
entry['ratio'] = SM(None, music_data.get('artist')+' - '+music_data.get('title'), entry['title']).ratio() | |
ordered_list = sorted(result['entries'], key=itemgetter('ratio'), reverse=True) | |
for i in range(len(ordered_list)): | |
print '----- youtube downloading -----' | |
print 'Title: '+ordered_list[i]["title"] | |
print 'URL: '+ordered_list[i]["url"] | |
print 'attempt to download %s' % (i+1) | |
result = youtube_downloader(ordered_list[i]["title"], ordered_list[i]["url"], music_data) | |
if result: | |
return | |
print 'Sorry, downloadable videos for this music was not found' | |
def youtube_downloader(video_title, video_url, music_data): | |
output_folder_path = '%s/%s/%s' % (MUSIC_FOLDER_ABSOLUTE_PATH, music_data.get('artist'), music_data.get('album')) | |
output_file_path = '%s/%s.mp3' % (output_folder_path, video_title) | |
temp_file_path = '/tmp/%s-%s.%s' % (uuid.uuid4(), video_title, '%(ext)s') | |
# check if file already exists | |
if isfile(output_file_path): | |
print "Music alread downloaded: %s" % output_file_path | |
return True | |
# create folder if needed | |
if not exists(output_folder_path): | |
makedirs(output_folder_path) | |
yt_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': temp_file_path, | |
'extractaudio': True, | |
'noplaylist': True, | |
'postprocessors': [ | |
{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}, | |
{ | |
'key': 'FFmpegMetadata' | |
} | |
], | |
'logger': MyLogger(), | |
'progress_hooks': [yt_hook], | |
} | |
with YoutubeDL(yt_opts) as ydl: | |
result = ydl.download([video_url]) | |
if result == 0: | |
temp_file_path = temp_file_path.replace('.%(ext)s', ".mp3") | |
if isfile(temp_file_path): | |
shutil.move(temp_file_path, output_file_path) | |
else: | |
print 'Error: File was not downloaded...' | |
return False | |
print '\n'+music_data.get('title')+'\'s download finished =) ' | |
return True | |
def get_playing_now(session_bus): | |
spotify_bus = session_bus.get_object("org.mpris.MediaPlayer2.spotify", "/org/mpris/MediaPlayer2") | |
spotify_properties = dbus.Interface(spotify_bus, "org.freedesktop.DBus.Properties") | |
metadata = spotify_properties.Get("org.mpris.MediaPlayer2.Player", "Metadata") | |
spotify_handler(metadata) | |
def spotify_handler(*args): | |
print '----- DOWNLOAD STARTED - To exit Ctrl-C -----' | |
try: | |
metadata=args[1]["Metadata"] | |
except: | |
metadata = args[0] | |
music_data = spotify_grabber(metadata) | |
youtube_grabber(music_data).get() | |
def worker(): | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
def main(): | |
DBusGMainLoop(set_as_default=True) | |
async.pool = Pool(4, worker) | |
session_bus = dbus.SessionBus() | |
try: | |
# look for spotify signals (new music starts) | |
session_bus.add_signal_receiver(spotify_handler, 'PropertiesChanged', None, 'org.mpris.MediaPlayer2.spotify', '/org/mpris/MediaPlayer2') | |
# if theres a music playing now, download it | |
get_playing_now(session_bus) | |
except (KeyboardInterrupt, SystemExit): | |
async.pool.terminate() | |
async.pool.join() | |
print 'bye' | |
sys.exit(666) | |
loop = gobject.MainLoop() | |
loop.run() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment