Skip to content

Instantly share code, notes, and snippets.

@cyanidium
Created October 19, 2014 17:57
Show Gist options
  • Save cyanidium/dc3a726cda28e9d307ef to your computer and use it in GitHub Desktop.
Save cyanidium/dc3a726cda28e9d307ef to your computer and use it in GitHub Desktop.
Checks music files for duplicates based on their "Artist - Title" combination, which should be correct if you also use music_tag_corrector.py to fix those up.
#!/usr/bin/env python
"""
Brute force music matching script. Gets the unique identifiers of each music
file in the music directory and finds duplicate. All duplicates are output in a
list at the end. Output defaults to STDOUT, but can be set as a file.
Tags are used to match files, so make sure the tags are correct (maybe try
music_tag_corrector.py first)
"""
#Requires apt-get install python-mutagen
#Last updated 08 Apr 2012
#cyanidium
import os
import os.path
import sys
import mutagen
import sqlite3
from optparse import OptionParser
from multiprocessing import Pool
#Default values
__API_KEY__ = "goes here"
__API_SECRET__ = "goes here"
__PROCESSES__ = 20 #Seems about right to keep everything flowing
__DBFILE__ = "/tmp/MusicDupeMatch.db"
#Parse options
parser = OptionParser(description=__doc__.strip())
parser.add_option("-d",
"--directory",
action="store",
type="string",
dest="music_dir",
default=False,
help="directory with music to search",
metavar="DIR")
parser.add_option("-o",
"--output",
action="store",
type="string",
dest="output_file",
default="",
help="where to save the duplicate list to",
metavar="FILE")
parser.add_option("-t",
"--threads",
action="store",
type="int",
dest="processes",
default=__PROCESSES__,
help="number of threads to use")
options, args = parser.parse_args()
def main():
"""
Function loader.
"""
#Find the duplicate songs
if (options.music_dir and
os.path.isdir(options.music_dir) and
create_database()):
multi_thread_dup_searcher()
print_dups("title", "Duplicate titles")
os.remove(__DBFILE__)
else:
parser.print_help()
print "Error: no music directory given or %s already exists" % (__DBFILE__)
def create_database():
"""
Create the database.
"""
if os.path.exists(__DBFILE__):
return False
else:
conn = sqlite3.connect(__DBFILE__)
conn.text_factory = str
c = conn.cursor()
c.execute("create table title (id text, filename text)")
conn.commit()
conn.close()
return True
def multi_thread_dup_searcher():
"""
Uses threading to process multiple music files at once, reducing the time it
takes to find the duplicates.
"""
file_paths = find_files()
#Start the tag corrector
pool = Pool(processes=options.processes)
r = pool.map_async(wrap_get_unique_ids, file_paths)
pool.close()
#Wait for everything to finish
pool.join()
#Show any files that did not successfully exit
errors = r.get()
#For ease of reading
errors.sort()
for error in errors:
if error:
print error
def find_files():
"""
Creates a list of all music files.
"""
file_paths = ()
for root, dirs, files in os.walk(options.music_dir):
for music_file in files:
file_paths += os.path.join(root, music_file),
return file_paths
def wrap_get_unique_ids(file_path):
"""
Attempt to catch any errors and move on with the other files.
"""
try:
return get_unique_ids(file_path)
except Exception, error:
return "%s:: %s" % (file_path, error)
def get_unique_ids(file_path):
"""
Main function.
Add any tags that may uniquely identify the given song to the relevant dictionary.
If a dictionary key has more than one value, there is a duplicate.
"""
#Handle all music types. easy is needed to nicely handle id3 tags in mp3s
tags = mutagen.File(file_path, easy=True)
#Ignore non-music files, but still proceed with untagged music files
if tags is None:
return False
#Connect to the database
conn = sqlite3.connect(__DBFILE__)
conn.text_factory = str
c = conn.cursor()
#####Title#####
if 'title' in tags and 'artist' in tags:
c.execute("INSERT INTO title VALUES (?, ?)",
(tags['artist'][0].__str__().lower() + " - " + tags['title'][0].__str__().lower(),
file_path,))
#Clean up
conn.commit()
conn.close()
#No errors
return False
def print_dups(dup_db, title):
"""
Print a list of all duplicate songs. Will append to file if it already
exists. STDOUT is the fallback if needed.
"""
if options.output_file:
try:
output = open(options.output_file, 'a')
except IOError:
print "Couldn't open/create your output file, printing to STDOUT"
output = sys.stdout
else:
print "No output file specified, printing to STDOUT"
output = sys.stdout
#Connect to the database
conn = sqlite3.connect(__DBFILE__)
conn.text_factory = str
c1 = conn.cursor()
#Use SQL to find the ids of the duplicates
c1.execute("SELECT id FROM %s GROUP BY id HAVING (COUNT(id)>1)" % (dup_db))
#Pretty output
#output.write("\n")
#output.write("*******************\n")
#output.write(title)
#output.write("\n")
#output.write("*******************\n")
for row1 in c1:
c2 = conn.cursor()
c2.execute("SELECT filename FROM %s WHERE id=? ORDER BY filename" % (dup_db), (row1[0],))
for row2 in c2:
output.write(row2[0])
output.write("\n")
c2.close()
output.write("\n")
output.write("\n")
#Clean up
conn.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment