Skip to content

Instantly share code, notes, and snippets.

@divadsn
Last active November 10, 2017 00:13
Show Gist options
  • Save divadsn/15eb8adcd59de8afcc67ff2f2d189c46 to your computer and use it in GitHub Desktop.
Save divadsn/15eb8adcd59de8afcc67ff2f2d189c46 to your computer and use it in GitHub Desktop.
Some hacky SoundCloud grabber for my radio station
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import eyed3
import soundcloud
import sqlite3
import sys
import re
import socket
import urllib
def remove_tags(rgx_list, text):
new_text = text
for r in rgx_list:
rgx = re.compile(r"[\(\[][^\(\[]*" + r + ".*?[\)\]]", re.IGNORECASE)
new_text = re.sub(rgx, '', new_text)
return new_text
def parse_meta(trackname):
# Check if it's valid for meta split
if '-' in trackname:
pos = trackname.index('-') + 1
elif '|' in trackname:
pos = trackname.index('|') + 1
else:
return False
# Retrieve our meta from track name
artist = trackname[:pos - 1].strip()
title = trackname[pos + 1:].strip()
# Tags to remove from title
tags = [
"download",
"free",
"release",
"monstercat",
"out now",
"available",
"new artist week"
]
# Remove tags and strip left-over whitespace
new_title = remove_tags(tags, title)
new_title = new_title.strip()
return { "artist": artist, "title": new_title }
# Enforce utf8 encoding everywhere
reload(sys)
sys.setdefaultencoding('utf8')
# Set default timeout to 10 seconds
socket.setdefaulttimeout(10)
# SoundCloud client id for API access
sc_key = 'c6CU49JDMapyrQo06UxU9xouB9ZVzqCn'
# Establish database connection
db = sqlite3.connect('tagindex.db')
# Create table if not exists
db.execute("""CREATE TABLE IF NOT EXISTS tracks (
id INT NOT NULL PRIMARY KEY,
artist TEXT NOT NULL,
title TEXT NOT NULL,
file TEXT NOT NULL
)""")
db.commit()
# Working directory
rootdir = os.path.dirname(os.path.realpath(__file__))
# Output directory for new tracks
outdir = os.path.join(rootdir, "monstercat")
# List of directories to not check for music
exclude = [
'.temp'
]
# Start rebuilding tag database
print "Rebuild music database..."
# Clear table before rebuilding
db.execute("DELETE FROM tracks")
db.execute("VACUUM")
db.commit()
# Loop for every file in every directory, subdirectory etc.
filelist = []
for subdir, dirs, files in os.walk(rootdir, topdown=True):
dirs[:] = [d for d in dirs if d not in exclude]
for file in files:
if file.endswith(".mp3"):
filelist.append(os.path.join(subdir, file))
# Check count of listed mp3 files
print "Found " + str(len(filelist)) + " tracks to update."
# Add all tracks to database for later use
for file in filelist:
# Load ID3 tag info
id3 = eyed3.load(file)
# Extract track meta
artist = id3.tag.artist.decode("utf-8")
title = id3.tag.title.decode("utf-8")
id = id3.tag.track_num[0]
# Fuck utf-8 encoding in Python
file = file.decode("utf-8")
try:
# Add track to the table
db.execute("INSERT INTO tracks (id, artist, title, file) VALUES (?, ?, ?, ?)", (id, artist, title, file))
except Exception as err:
print "Failed to add track " + title + " (" + str(id) + ")"
print err
# Save changes to database
db.commit()
# Prepare SoundCloud API client
client = soundcloud.Client(client_id=sc_key)
# List of playlists to fetch from SoundCloud
playlists = [
"https://soundcloud.com/monster-playlists/sets/monstercat",
"https://soundcloud.com/monster-playlists/sets/monstercat2",
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song",
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song-pt-2",
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song-pt-3"
]
# List of tracks to download
queue = []
# Fetch tracks from SoundCloud
print "\nDone! Fetching playlists from SoundCloud..."
for playlist in playlists:
print "Fetching " + playlist + "..."
# Resolve also returns the playlist with it's contents, pretty neat
info = client.get('/resolve', url=playlist)
if info.tracks:
print str(len(info.tracks)) + " tracks found, checking for new tracks..."
temp = []
# Search for every track and check if track is in our database
for track in info.tracks:
# We will lookup the database, so we need a cursor
cursor = db.cursor()
# Check if we can retrieve track info from title for advanced search
meta = parse_meta(track['title'])
if meta:
# Execute advanced search (search by id or by title and artist)
cursor.execute("SELECT * FROM tracks WHERE id = ? OR (artist LIKE ? AND title LIKE ?)", (str(track['id']), meta['artist'] + "%", meta['title'] + "%"))
else:
# Execute basic search (search by id)
cursor.execute("SELECT * FROM tracks WHERE id = ?", (str(track['id']),))
# Get data from database
data = cursor.fetchone()
if data is None:
# Add track to download queue if not exists
stream_url = track['stream_url']
if stream_url.startswith("http"):
temp.append(track)
else:
print "Unsupported stream url: " + stream_url
print "Added " + str(len(temp)) + " new tracks to download queue."
queue.extend(temp)
else:
print "No tracks found, is it a valid playlist url?"
# It's time to download tracks!
print "\nA total " + str(len(queue)) + " tracks needs to be downloaded, preparing..."
# Prepare temporary download folder
tempdir = os.path.join(rootdir, ".temp")
if not os.path.exists(tempdir):
os.makedirs(tempdir)
# List of failed tracks
failed = []
# Amount of tracks before
cursor = db.cursor()
cursor.execute("SELECT Count(*) FROM tracks")
prev_amount = cursor.fetchone()[0]
# Start downloading missing tracks
for track in queue:
# Prepare metadata for track
trackname = track['title']
# Check if it's valid title to parse meta
meta = parse_meta(trackname)
if not meta:
print trackname + " is not a valid track name, skipping."
failed.append(track)
continue
# Retrieve our parsed meta
artist = meta['artist']
title = meta['title']
# Track id for later use
url = track['stream_url'] + "?client_id=" + sc_key
id = track['id']
# Final filename
filename = artist + " - " + title + "-" + str(id) + ".mp3"
# Check if file already exists and skip
file = os.path.join(tempdir, filename)
if os.path.exists(file):
print "Track " + title + " (" + str(id) + ") already exists, skipping."
continue
# Download track and save to tempdir
print "Downloading " + title + " (" + str(id) + ")..."
try:
urllib.urlretrieve(url, os.path.join(tempdir, filename))
except Exception as err:
print "Failed to download track " + title + " (" + str(id) + ")"
print err
# Delete file if exists
if os.path.exists(file):
os.remove(file)
# We don't want to stop here...
continue
# Check if file is valid mp3
id3 = eyed3.load(file)
if id3 is None:
print "This track seems to be not downloadable, skipping."
os.remove(file)
continue
print "Done! Adding ID3 tag info..."
print "- Artist: " + artist
print "- Title: " + title
print "- Track num: " + str(id)
# Add metadata to track
id3.initTag()
id3.tag.artist = artist
id3.tag.title = title
track_num = int(id)
id3.tag.track_num = track_num
# Save ID3 tag info
print "Saved! Adding track to music database..."
id3.tag.save()
# Move file to output dir
file = os.path.join(outdir, filename)
#os.rename(os.path.join(tempdir, filename), file)
try:
# Add track to the table
db.execute("INSERT INTO tracks (id, artist, title, file) VALUES (?, ?, ?, ?)", (id, artist, title, file))
except Exception as err:
print "Failed to add track " + title + " (" + str(id) + ")"
print err
# Save changes to database
db.commit()
# Amount of tracks now
cursor = db.cursor()
cursor.execute("SELECT Count(*) FROM tracks")
new_amount = cursor.fetchone()[0]
# Print some statistics
print "\nFinished! New tracks added: " + str(new_amount - prev_amount)
print "Total amount of tracks now: " + str(new_amount)
# Output failed tracks
if len(failed) > 0:
print "\nFound invalid tracks: " + str(len(failed))
print "Please check if there are named correctly and try again!"
for track in failed:
print " - " + track['title']
print " " + track['permalink_url']
# Finishing stuff...
db.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment