Last active
March 6, 2018 23:06
-
-
Save roadmapper/376604a5e2f01ec96c0906905e4290d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
SoundCloud Downloader | |
~~~~~~~~~~~~~ | |
Downloads tracks. | |
""" | |
import argparse | |
import time | |
import json | |
import os | |
import logging | |
import re | |
import dateutil.parser | |
import requests | |
from mutagen.id3 import ID3 | |
from mutagen.id3._frames import TIT2, WOAF, TPE1, TALB, TSRC, TCOP, TDRC, TCON, APIC | |
SC_API_BASE_URL = 'https://api.soundcloud.com' | |
SC_API_V2_BASE_URL = 'https://api-v2.soundcloud.com' | |
def __setup_log(debug): | |
if debug: | |
level = logging.DEBUG | |
else: | |
level = logging.INFO | |
logger.setLevel(level) | |
# create console handler and set level to debug | |
ch = logging.StreamHandler() | |
ch.setLevel(level) | |
# create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# add formatter to ch | |
ch.setFormatter(formatter) | |
# add ch to logger | |
logger.addHandler(ch) | |
def download_file(track, token): | |
download_path = '' | |
if track['download_url'] is not None: | |
path = track['download_url'] + '?oauth_token=' + token | |
try: | |
r = requests.get(path) | |
logger.debug(path) | |
logger.debug(r.status_code) | |
download_path = r.url | |
r.raise_for_status() | |
except Exception as e: | |
path = 'https://api.soundcloud.com/i1/tracks/' + str(track['id']) + '/streams?oauth_token=' + token | |
logger.debug(path) | |
download_path = requests.get(path).json()['http_mp3_128_url'] | |
logger.debug(download_path) | |
r = requests.get(download_path) | |
else: | |
path = 'https://api.soundcloud.com/i1/tracks/' + str(track['id']) + '/streams?oauth_token=' + token | |
logger.debug(path) | |
try: | |
url_r = requests.get(path) | |
url_r.raise_for_status() | |
download_path = url_r.json()['http_mp3_128_url'] | |
logger.debug(download_path) | |
r = requests.get(download_path) | |
except: | |
logger.warn('Could not download %s', track['title']) | |
return | |
file_name = sanitize_filename(track['title']) + os.path.splitext(download_path[0:download_path.index('?')])[-1] | |
logger.debug('Downloading file %s', file_name) | |
with open(file_name, 'wb') as fd: | |
for chunk in r.iter_content(chunk_size=128): | |
fd.write(chunk) | |
tag_file(track, file_name) | |
logger.info('Downloaded file %s', file_name) | |
time.sleep(0.5) | |
def sanitize_filename(file_name): | |
""" | |
Make sure filenames are valid paths. | |
Returns: | |
str: | |
""" | |
sanitized_filename = re.sub(r'[/\\:*?"<>|.]', '-', file_name) | |
sanitized_filename = sanitized_filename.replace('&', 'and') | |
sanitized_filename = sanitized_filename.replace('"', '') | |
sanitized_filename = sanitized_filename.replace("'", '') | |
sanitized_filename = sanitized_filename.replace("/", '') | |
sanitized_filename = sanitized_filename.replace("\\", '') | |
# Annoying. | |
if sanitized_filename[0] == '.': | |
sanitized_filename = u'dot' + sanitized_filename[1:] | |
return sanitized_filename | |
def tag_file(track, file_name): | |
if file_name.endswith('mp3'): | |
audio = ID3() | |
audio.add(TIT2(encoding=3, text=track['title'])) | |
if 'publisher_metadata' in track: | |
publisher_metadata = track['publisher_metadata'] | |
if publisher_metadata is not None: | |
if 'artist' in track['publisher_metadata'] and publisher_metadata['artist'] is not None: | |
audio.add(TPE1(encoding=3, text=track['publisher_metadata']['artist'])) | |
if 'album_title' in track['publisher_metadata'] and track['publisher_metadata'][ | |
'album_title'] is not None: | |
audio.add(TALB(encoding=3, text=track['publisher_metadata']['album_title'])) | |
if 'isrc' in track['publisher_metadata'] and track['publisher_metadata'][ | |
'isrc'] is not None: | |
audio.add(TSRC(encoding=3, text=track['publisher_metadata']['isrc'])) | |
if 'c_line' in track['publisher_metadata'] and track['publisher_metadata'][ | |
'c_line'] is not None: | |
audio.add(TCOP(encoding=3, text=track['publisher_metadata']['c_line'])) | |
if 'p_line' in track['publisher_metadata'] and track['publisher_metadata'][ | |
'p_line'] is not None: | |
audio.add(TCOP(encoding=3, text=track['publisher_metadata']['p_line'])) | |
if track['purchase_url'] is not None: | |
audio.add(WOAF(encoding=3, url=track['purchase_url'])) | |
if track['release_date'] is not None: | |
audio.add(TDRC(encoding=3, text=str(dateutil.parser.parse(track['release_date']).year))) | |
if track['genre'] is not None: | |
audio.add(TCON(encoding=3, text=track['genre'])) | |
if track['artwork_url'] is not None: | |
original_artwork_url = track['artwork_url'] | |
# Try to get the highest quality artwork for the track | |
try: | |
artwork_url = original_artwork_url.replace('large', 't500x500') | |
r = requests.get(artwork_url) | |
r.raise_for_status() | |
artwork_bytes = r.content | |
except Exception as e: | |
try: | |
artwork_url = original_artwork_url.replace('large', 'crop') | |
r = requests.get(artwork_url) | |
r.raise_for_status() | |
artwork_bytes = r.content | |
except Exception as e: | |
try: | |
artwork_url = original_artwork_url.replace('large', 't300x300') | |
r = requests.get(artwork_url) | |
r.raise_for_status() | |
artwork_bytes = r.content | |
except Exception as e: | |
artwork_url = original_artwork_url | |
r = requests.get(artwork_url) | |
artwork_bytes = r.content | |
logger.debug(artwork_url) | |
mime = '' | |
if track['artwork_url'].endswith('jpg') or track['artwork_url'].endswith('jpeg'): | |
mime = 'jpeg' | |
if track['artwork_url'].endswith('png'): | |
mime = 'png' | |
if track['artwork_url'].endswith('gif'): | |
mime = 'gif' | |
audio.add(APIC(encoding=3, mime='image/' + mime, data=artwork_bytes)) | |
audio.save(file_name) | |
def iterate_track_pages(path, token): | |
request_json = requests.get(path + '&oauth_token=' + token).json() | |
tracks = request_json['collection'] | |
while request_json['next_href']: | |
new_path = request_json['next_href'] | |
logger.debug(new_path) | |
request_json = requests.get(new_path + '&oauth_token=' + token).json() | |
tracks.extend(request_json['collection']) | |
return tracks | |
def get_tracks(category, token, userid): | |
limit = 200 | |
tracks = [] | |
path = '' | |
if category == 'likes' or category == 'all': | |
path = '/users/' + str(userid) + '/track_likes?linked_paritioning=1&limit=' + str(limit) | |
tracks.extend(iterate_track_pages(SC_API_V2_BASE_URL + path, token)) | |
# if category == 'reposts' or category == 'all': | |
# path = '/stream/users/' + str(userid) + '/reposts?linked_paritioning=1&limit=' + str(limit) | |
# tracks.extend(iterate_track_pages(SC_API_V2_BASE_URL + path, token)) | |
# if category == 'tracks' or category == 'all': | |
# path = '/users/' + str(userid) + '/tracks?linked_paritioning=1&limit=' + str(limit) | |
# tracks.extend(iterate_track_pages(SC_API_V2_BASE_URL + path, token)) | |
logger.debug(path) | |
with open('data.txt', 'w') as outfile: | |
json.dump(tracks, outfile) | |
logger.info('Got file names') | |
return tracks | |
def main(): | |
""".""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("category", | |
choices=['likes', 'reposts', 'tracks', 'all'], | |
help="likes, reposts, tracks, all") | |
parser.add_argument('--oauth-token', | |
nargs='?', | |
help="SoundCloud OAuth2 token", | |
required=True) | |
parser.add_argument('-t', '--track-id', nargs='?', help='track id') | |
parser.add_argument('--debug', | |
help="use for debug output", | |
action="store_true") | |
args = parser.parse_args() | |
category = args.category | |
oauth_token = args.oauth_token | |
track_id = args.track_id | |
debug = args.debug | |
__setup_log(debug) | |
userid = requests.get(SC_API_BASE_URL + '/me?oauth_token=' + oauth_token).json()['id'] | |
if track_id: | |
r = requests.get(SC_API_V2_BASE_URL + '/tracks/' + str(track_id) + '?oauth_token=' + oauth_token) | |
download_file(r.json(), oauth_token) | |
else: | |
tracks = get_tracks(category, oauth_token, userid) | |
for track in tracks: | |
download_file(track['track'], oauth_token) | |
if __name__ == '__main__': | |
logger = logging.getLogger(__file__) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment