Skip to content

Instantly share code, notes, and snippets.

@KeizerDev
Created February 3, 2016 22:50
Show Gist options
  • Save KeizerDev/8e14540befe089631507 to your computer and use it in GitHub Desktop.
Save KeizerDev/8e14540befe089631507 to your computer and use it in GitHub Desktop.
spotify2youtubedownload
#!/usr/bin/python
#title :spotify_downloader.py
#description :This script download music playing on spotify (Linux) from youtube.
#source :https://github.com/xfilipe/spotify-music-downloader
#author :xfilipe
#date :20160203
#version :0.1.1
#usage :python spotify_downloader.py
#python_version :2.7.6
#==============================================================================
# Import the modules needed to run the script.
from __future__ import unicode_literals
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from youtube_dl import YoutubeDL
import gobject, shutil, uuid
from os import makedirs
from os.path import expanduser, isfile, exists
import sys, traceback
from inspect import getmodule
from multiprocessing import Pool
import signal
from difflib import SequenceMatcher as SM
from operator import itemgetter
# configs
home = expanduser("~")
MUSIC_FOLDER_ABSOLUTE_PATH = home+'/Music'
class MyLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
print(msg)
def yt_hook(d):
if d['status'] == 'downloading':
print '\r'+'Download:' + d['filename'].split('-')[-1][:-4], d['_percent_str'], d['_speed_str'],
sys.stdout.flush()
# decorator to send download function to background
def async(decorated):
module = getmodule(decorated)
decorated.__name__ += str('_original')
setattr(module, decorated.__name__, decorated)
def send(*args, **opts):
return async.pool.apply_async(decorated, args, opts)
return send
def spotify_grabber(metadata):
title = metadata['xesam:title']
album = metadata['xesam:album']
artist = metadata['xesam:artist']
if isinstance(title, dbus.Array):
title = title[0]
if isinstance(album, dbus.Array):
album = album[0]
if isinstance(artist, dbus.Array):
artist = artist[0]
print '----- spotify info -----'
print 'Title: ' + title
print 'Artist: ' + artist
print 'Album: ' + album
print '------------------------'
return {
'title': str(title.encode('ascii','ignore')),
'album': str(album.encode('ascii','ignore')),
'artist': str(artist.encode('ascii','ignore'))
}
@async
def youtube_grabber(music_data):
query_url = 'https://www.youtube.com/results?search_query='+music_data.get('artist')+'+'+music_data.get('title')+'+%23music'
yt_opts = {
'logger': MyLogger()
}
with YoutubeDL(yt_opts) as ydl:
result = ydl.extract_info(query_url, download=False, process=False)
for entry in result['entries']:
if 'full album' in entry['title'].lower():
entry['ratio'] = 0.01
else:
entry['ratio'] = SM(None, music_data.get('artist')+' - '+music_data.get('title'), entry['title']).ratio()
ordered_list = sorted(result['entries'], key=itemgetter('ratio'), reverse=True)
for i in range(len(ordered_list)):
print '----- youtube downloading -----'
print 'Title: '+ordered_list[i]["title"]
print 'URL: '+ordered_list[i]["url"]
print 'attempt to download %s' % (i+1)
result = youtube_downloader(ordered_list[i]["title"], ordered_list[i]["url"], music_data)
if result:
return
print 'Sorry, downloadable videos for this music was not found'
def youtube_downloader(video_title, video_url, music_data):
output_folder_path = '%s/%s/%s' % (MUSIC_FOLDER_ABSOLUTE_PATH, music_data.get('artist'), music_data.get('album'))
output_file_path = '%s/%s.mp3' % (output_folder_path, video_title)
temp_file_path = '/tmp/%s-%s.%s' % (uuid.uuid4(), video_title, '%(ext)s')
# check if file already exists
if isfile(output_file_path):
print "Music alread downloaded: %s" % output_file_path
return True
# create folder if needed
if not exists(output_folder_path):
makedirs(output_folder_path)
yt_opts = {
'format': 'bestaudio/best',
'outtmpl': temp_file_path,
'extractaudio': True,
'noplaylist': True,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
},
{
'key': 'FFmpegMetadata'
}
],
'logger': MyLogger(),
'progress_hooks': [yt_hook],
}
with YoutubeDL(yt_opts) as ydl:
result = ydl.download([video_url])
if result == 0:
temp_file_path = temp_file_path.replace('.%(ext)s', ".mp3")
if isfile(temp_file_path):
shutil.move(temp_file_path, output_file_path)
else:
print 'Error: File was not downloaded...'
return False
print '\n'+music_data.get('title')+'\'s download finished =) '
return True
def get_playing_now(session_bus):
spotify_bus = session_bus.get_object("org.mpris.MediaPlayer2.spotify", "/org/mpris/MediaPlayer2")
spotify_properties = dbus.Interface(spotify_bus, "org.freedesktop.DBus.Properties")
metadata = spotify_properties.Get("org.mpris.MediaPlayer2.Player", "Metadata")
spotify_handler(metadata)
def spotify_handler(*args):
print '----- DOWNLOAD STARTED - To exit Ctrl-C -----'
try:
metadata=args[1]["Metadata"]
except:
metadata = args[0]
music_data = spotify_grabber(metadata)
youtube_grabber(music_data).get()
def worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def main():
DBusGMainLoop(set_as_default=True)
async.pool = Pool(4, worker)
session_bus = dbus.SessionBus()
try:
# look for spotify signals (new music starts)
session_bus.add_signal_receiver(spotify_handler, 'PropertiesChanged', None, 'org.mpris.MediaPlayer2.spotify', '/org/mpris/MediaPlayer2')
# if theres a music playing now, download it
get_playing_now(session_bus)
except (KeyboardInterrupt, SystemExit):
async.pool.terminate()
async.pool.join()
print 'bye'
sys.exit(666)
loop = gobject.MainLoop()
loop.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment