|
from soundcloud import Client |
|
from requests import get |
|
from os.path import join as path_join |
|
from mutagen.mp3 import MP3 |
|
from mutagen import File |
|
from mutagen.id3 import ID3, ID3NoHeaderError, TIT2, TPE1, TPE2, TALB, TRCK, APIC |
|
|
|
class Downloader: |
|
def __init__(self, client_id): |
|
self.client = Client(client_id=client_id) |
|
self.clear_current_data() |
|
|
|
def clear_current_data(self): |
|
self.current_data = { |
|
'user': None, |
|
'playlist': None, |
|
'tracks': None, |
|
'track': None |
|
} |
|
|
|
def generate_tags(self, location): |
|
user = self.current_data['user'] |
|
playlist = self.current_data['playlist'] |
|
tracks = self.current_data['tracks'] |
|
track = self.current_data['track'] |
|
if user is None or track is None: |
|
print(' Had an issue tagging the current song. Skipped.') |
|
return |
|
try: |
|
mp3 = MP3(location) |
|
except: |
|
print(' Could not tag a non-MP3 file.') |
|
return |
|
try: |
|
mp3.add_tags() |
|
except: |
|
pass # Has tags |
|
|
|
mp3.tags.add(TIT2(text=track['title'], encoding=3)) # Title |
|
mp3.tags.add(TPE1(text=user.username, encoding=3)) # Artist |
|
mp3.tags.add(TPE2(text=user.username, encoding=3)) # Album artist |
|
if playlist is not None: |
|
mp3.tags.add(TALB(text=playlist.title, encoding=3)) # Album |
|
if tracks is not None: |
|
mp3.tags.add(TRCK(text='{}/{}'.format(tracks.collection.index(track) + 1, playlist.track_count), encoding=3)) # Track number and total |
|
artwork_url = self.resize(track['artwork_url']) |
|
url_parts = [x.lower() for x in artwork_url.split('.')] |
|
mp3.tags.add( # Cover art |
|
APIC( |
|
encoding=3, |
|
mime='image/jpeg' if url_parts[-1] in ['jpg', 'jpeg'] else 'image/png', |
|
type=3, |
|
desc='Cover', |
|
data=get(artwork_url).content |
|
) |
|
) |
|
mp3.save() |
|
|
|
def resize(self, url, to='500x500'): |
|
parts = url.split('-') |
|
end_parts = parts[-1].split('.') |
|
end_parts[0] = 't{}'.format(to) |
|
parts[-1] = '.'.join(end_parts) |
|
return '-'.join(parts) |
|
|
|
def download_track(self, track, location): |
|
self.current_data['track'] = track |
|
if self.current_data['user'] is None or self.current_data['user'].id != track['user_id']: |
|
self.current_data['user'] = self.client.get('/users/{}'.format(track['user_id'])) |
|
if 'download_url' in track: |
|
print(' Downloading source file') |
|
url = track['download_url'] + '?client_id=' + self.client.client_id |
|
else: |
|
print(' Downloading stream. Source file unavailable.') |
|
stream = self.client.get(track['stream_url'], allow_redirects=False) |
|
url = stream.location |
|
with open(location, 'wb') as f: |
|
f.write(get(url).content) |
|
self.generate_tags(location) |
|
|
|
def download_tracks(self, tracks, directory): |
|
print(' Downloading {} track{}.'.format(len(tracks.collection), 's' if len(tracks.collection) != 1 else '')) |
|
for track in tracks.collection: |
|
print(' Downloading {}.'.format(track['title'])) |
|
title = self.sanitize_track_name(track['title']) |
|
self.download_track(track, path_join(directory, title + '.mp3')) |
|
|
|
def download_paginated(self, url): |
|
return self.client.get(url, limit=200, linked_partitioning=1) |
|
|
|
def download_playlist(self, playlist_url, directory): |
|
self.clear_current_data() |
|
playlist = self.client.get('/resolve', url=playlist_url) |
|
self.current_data['playlist'] = playlist |
|
tracks = self.download_paginated('/playlists/{playlist_id}/tracks'.format(playlist_id=playlist.id)) |
|
self.current_data['tracks'] = tracks |
|
self.download_all_tracks(tracks, directory) |
|
|
|
def download_all_tracks(self, paged_collection, directory): |
|
self.download_tracks(paged_collection, directory) |
|
while 'next_href' in paged_collection.keys(): |
|
paged_collection = self.client.get(paged_collection.next_href) |
|
self.download_tracks(paged_collection, directory) |
|
|
|
def sanitize_track_name(self, name): |
|
return name.replace('/', ':') |
|
|
|
def download_all_user_tracks(self, user, directory): |
|
print('Downloading all tracks from {}.'.format(user.username)) |
|
user_tracks = self.download_paginated('/users/{user_id}/tracks'.format(user_id=user.id)) |
|
self.download_all_tracks(user_tracks, directory) |
|
|
|
def download_track_link(self, track_link, directory): |
|
self.clear_current_data() |
|
track = self.client.get('/resolve', url=track_link) |
|
self.download_track(track.obj, path_join(directory, self.sanitize_track_name(track.title) + '.mp3')) |
|
|
|
def get_user(self, user_link): |
|
user = self.client.get('/resolve', url=user_link) |
|
self.current_data['user'] = user |
|
return user |
|
|
|
def download_user(self, user_link, directory): |
|
self.clear_current_data() |
|
self.download_all_user_tracks(self.get_user(user_link), directory) |