Skip to content

Instantly share code, notes, and snippets.

@rdswift
Last active March 18, 2018 03:27
Show Gist options
  • Save rdswift/b65e49a47884896d9bd3409f511cbd2c to your computer and use it in GitHub Desktop.
Save rdswift/b65e49a47884896d9bd3409f511cbd2c to your computer and use it in GitHub Desktop.
MusicBrainz Picard 1.4.2 plugin (releaselogger.py) to dump selected CD information to a text file for later import to a database. Python script (log2db.py) to create a SQLite database and populate it from the data dump text file.
"""
Python script used to create the initial (empty) SQLite database
file if required for the music database, and populate the database
from the file generated by the ReleaseLogger plugin for Picard.
Once processed, the source ReleaseLogger data file is appended to
a backup file, and the source file reset to an empty file.
Copyright © 2017, Bob Swift. Released under GPL-2.0 or later.
"""
import os
import sys
import time
import datetime
import sqlite3
import unicodedata
_FILE_LOG = 'ReleaseLogger.txt'
_FILE_BACKUP = 'C:\\Databases\\Picard Music Database\\ReleaseLoggerProcessed.txt'
_FILE_DB = 'C:\\Databases\\Picard Music Database\\MusicData.db'
_DB_CONFIG = [
"X:\\Music\\CoverArt",
]
_TABLE_SPECS = {
'Configuration': [
('CoverArtDir', 'TEXT'),
],
'Releases': [
('Selected', 'INTEGER'),
('ReleaseID', 'TEXT'),
('ReleaseYear', 'TEXT'),
('ReleaseTypePrimary', 'TEXT'),
('ReleaseTypeFull', 'TEXT'),
('ReleaseDiscs', 'INTEGER'),
('ReleaseTitle', 'TEXT'),
('ReleaseArtist', 'TEXT'),
('ReleaseArtistSort', 'TEXT'),
('ReleaseInCollection', 'INTEGER'),
('ReleaseAddedDate', 'TEXT'),
],
'ReleaseMedia': [
('ReleaseID', 'TEXT'),
('MediaNumber', 'INTEGER'),
('MediaType', 'TEXT'),
('MediaName', 'TEXT'),
],
'Artists': [
('ArtistID', 'TEXT'),
('ArtistName', 'TEXT'),
('ArtistNameSort', 'TEXT'),
],
'Tracks': [
('ReleaseID', 'TEXT'),
('TrackID', 'TEXT'),
('RecordingID', 'TEXT'),
('DiscNumber', 'INTEGER'),
('TrackNumber', 'INTEGER'),
('TrackLength', 'TEXT'),
('TrackTitle', 'TEXT'),
('TrackArtist', 'TEXT'),
],
'ReleaseArtists': [
('ReleaseID', 'TEXT'),
('ArtistIndex', 'INTEGER'),
('ArtistID', 'TEXT'),
],
'TrackArtists': [
('TrackID', 'TEXT'),
('ArtistIndex', 'INTEGER'),
('ArtistID', 'TEXT'),
]
}
##############################################################################
def erase_file(file_to_erase):
"""
Delete the specified file if it exists.
"""
if os.path.isfile(file_to_erase):
os.remove(file_to_erase)
##############################################################################
def timestring():
"""
Return the local time string for the current time.
"""
time_value = time.time()
mils = "%03d" % (int((time_value - int(time_value)) * 1000))
tistr = time.strftime("%H:%M:%S", time.localtime(time_value))
#return "%s.%s" % (tistr, mils)
return "%s" % (tistr)
##############################################################################
def ts_print(text_to_print):
"""
Prints the specified text with the current timestamp.
"""
print("%s: %s" % (timestring(), text_to_print))
##############################################################################
def make_table(tableName):
"""
Creates the specified table within the database.
"""
temp = []
for (a, b) in (_TABLE_SPECS[tableName]):
temp.append("%s %s" % (a, b))
if tableName == "Configuration":
tableSpec = ""
else:
tableSpec = "ID INTEGER PRIMARY KEY AUTOINCREMENT, "
tableSpec = tableSpec + ", ".join(temp[:])
ts_print("Creating database table: %s" % (tableName,))
with sqlite3.connect(_FILE_DB) as c:
cursor = c.cursor()
cursor.execute('PRAGMA encoding="UTF-8";')
createCommand = "CREATE TABLE %s (%s)" % (tableName, tableSpec)
cursor.execute(createCommand)
c.commit()
##############################################################################
def make_database():
"""
Creates the SQLite database for the music data.
"""
ts_print("Creating database file: %s" % (_FILE_DB,))
for table_name in _TABLE_SPECS.keys():
make_table(table_name)
ts_print("Databse file creation complete.")
initialize_database_configuration()
##############################################################################
def initialize_database_configuration():
"""
Initializes the configuration for the SQLite music database.
"""
if len(_DB_CONFIG) == len(_TABLE_SPECS["Configuration"]):
ts_print("Initializing database configuration.")
insertItem("Configuration", _DB_CONFIG)
else:
ts_print("ERROR! Invalid Configuration table information.")
sys.exit(3)
ts_print("Databse configuration initialized.")
##############################################################################
def checkItem(tableName, checkList, checkItems):
"""
Checks if an item already exists in the SQLite music database.
"""
if os.path.isfile(_FILE_DB):
if tableName in _TABLE_SPECS.keys():
with sqlite3.connect(_FILE_DB) as c:
cursor = c.cursor()
cursor.execute('PRAGMA encoding="UTF-8";')
createCommand = "SELECT COUNT(*) FROM %s WHERE %s" % (tableName, checkList,)
# print("")
# print(createCommand)
# print(checkItems[:])
cursor.execute(createCommand, (checkItems[:]))
return int("%s" % cursor.fetchone())
else:
ts_print("ERROR! Table '%s' not found." % (tableName,))
sys.exit(2)
else:
ts_print("ERROR! Database not found.")
sys.exit(1)
##############################################################################
def insertItem(tableName, parts):
"""
Inserts a new item in the SQLite music database.
"""
if os.path.isfile(_FILE_DB):
if tableName in _TABLE_SPECS.keys():
cCount = len(_TABLE_SPECS[tableName])
if cCount:
tItems = []
qMarks = ", ".join("?" * cCount)
for (tItem, tSpec) in _TABLE_SPECS[tableName]:
tItems.append(tItem)
cmdItems = ", ".join(tItems[:])
with sqlite3.connect(_FILE_DB) as c:
cursor = c.cursor()
cursor.execute('PRAGMA encoding="UTF-8";')
createCommand = "INSERT INTO %s (%s) VALUES (%s)" % (tableName, cmdItems, qMarks)
cursor.execute(createCommand, (parts[:]))
c.commit()
else:
ts_print("ERROR! Table '%s' not found." % (tableName,))
sys.exit(2)
else:
ts_print("ERROR! Database not found.")
sys.exit(1)
##############################################################################
def main():
"""
Main function of the module.
Creates a new SQLite music database, if necessary, and processes the
file generated by the ReleaseLogger plugin for Picarddatabase to populate
the database.
"""
print("\n")
ts_print("Checking for database file.")
if not os.path.isfile(_FILE_DB):
make_database()
else:
ts_print("Database file found.")
ts_print("Checking for ReleaseLogger log file.")
if not os.path.isfile(_FILE_LOG):
ts_print("ERROR! ReleaseLogger log file not found.")
sys.exit(4)
lCount = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
# Offset: 0 = Lines Read
# 1 = Unknown Lines
# 2 = Blank Lines
# 3 = Releases Existing
# 4 = Releases Added
# 5 = Artists Existing
# 6 = Artists Added
# 7 = Tracks Existing
# 8 = Tracks Added
# 9 = ReleaseArtists Existing
# 10 = ReleaseArtists Added
# 11 = TrackArtists Existing
# 12 = TrackArtists Added
# 13 = Media Existing
# 14 = Media Added
# 15 = unused
# 16 = unused
# 17 = unused
# 18 = unused
# 19 = unused
ts_print("Processing the ReleaseLogger log file.")
with open(_FILE_LOG, encoding="utf8") as iFile:
for line in iFile:
lCount[0] += 1
lCountText = format(lCount[0], ",d")
print(" Line: %10s\r" % (lCountText), end="", flush=True)
parts = line.strip().split("^^^")
if len(parts):
#time.sleep(.2)
lineType = parts[0]
parts = parts[1:]
tableName = "Unknown"
checkItems = []
if lineType.upper() == "R":
parts.append(0,)
tableName = "Releases"
checkList = "ReleaseID=?"
checkItems.append(parts[0])
cOffset = 3
parts.insert(0, 0)
elif lineType.upper() == "A":
tableName = "Artists"
checkList = "ArtistID=?"
checkItems.append(parts[0])
cOffset = 5
elif lineType.upper() == "T":
tableName = "Tracks"
checkList = "TrackID=?"
checkItems.append(parts[1])
cOffset = 7
elif lineType.upper() == "RA":
tableName = "ReleaseArtists"
checkList = "ReleaseID=? AND ArtistID=?"
checkItems.append(parts[0])
checkItems.append(parts[2])
cOffset = 9
elif lineType.upper() == "TA":
tableName = "TrackArtists"
checkList = "TrackID=? AND ArtistID=?"
checkItems.append(parts[0])
checkItems.append(parts[2])
cOffset = 11
elif lineType.upper() == "M":
tableName = "ReleaseMedia"
checkList = "ReleaseID=? AND MediaNumber=?"
checkItems.append(parts[0])
checkItems.append(parts[1])
cOffset = 13
else:
lCount[1] += 1
if tableName in _TABLE_SPECS.keys():
if checkItem(tableName, checkList, checkItems):
lCount[cOffset] += 1
else:
insertItem(tableName, parts)
lCount[cOffset + 1] += 1
else:
lCount[2] += 1
ts_print("Data import complete.")
ts_print("Backing up the ReleaseLogger log file.")
with open(_FILE_BACKUP, "a", encoding="utf8") as oFile:
with open(_FILE_LOG, encoding="utf8") as iFile:
oFile.write(iFile.read())
with open(_FILE_LOG, "w") as iFile:
pass
ts_print("Log file moved to processed backup file.")
a = format(lCount[0], ",d")
b = format(lCount[2], ",d")
c = format(lCount[1], ",d")
print("\nLines Processed: %s (%s blank, %s unknown)" % (a, b, c,))
a = format(lCount[4], ",d")
b = format(lCount[3], ",d")
print("Releases Added: %s (%s existing)" % (a, b,))
a = format(lCount[14], ",d")
b = format(lCount[13], ",d")
print("Release Media Added: %s (%s existing)" % (a, b,))
a = format(lCount[6], ",d")
b = format(lCount[5], ",d")
print("Artists Added: %s (%s existing)" % (a, b,))
a = format(lCount[8], ",d")
b = format(lCount[7], ",d")
print("Tracks Added: %s (%s existing)" % (a, b,))
a = format(lCount[10], ",d")
b = format(lCount[9], ",d")
print("Release Artists Added: %s (%s existing)" % (a, b,))
a = format(lCount[12], ",d")
b = format(lCount[11], ",d")
print("Track Artists Added: %s (%s existing)" % (a, b,))
print("")
##############################################################################
if __name__ == '__main__':
main()
##############################################################################
PLUGIN_NAME = 'Release Logger'
PLUGIN_AUTHOR = 'Bob Swift (rdswift)'
PLUGIN_DESCRIPTION = '''
This plugin saves information regarding any loaded releases to a log file.
By default, the file is called 'ReleaseLogger.txt' and is located in the
file naming destination directoiry.
<br /><br />
The information is stored in the following lines:
<ul>
<li>A: Artist Information (Artist ID, Name, Sort Name)
<li>R: Release Information (Release ID, Year, Primary Release Type, Full
Release Type, Total Discs, Album Title, Album Artist, Album Artist Sort)
<li>T: Track Information (Release ID, Track ID, Recording ID, Disc Number,
Track Number, Track Length, Track Title, Track Artist)
<li>RA: Release Artist Information (Release ID, Artist List Order Number,
Artist ID)
<li>TA: Track Artist Information (Track ID, Artist List Order Number,
Artist ID)
</ul>
'''
PLUGIN_VERSION = "0.1"
PLUGIN_API_VERSIONS = ["1.4"]
PLUGIN_LICENSE = "GPL-2.0 or later"
PLUGIN_LICENSE_URL = "https://www.gnu.org/licenses/gpl-2.0.html"
_log_filename = "\\ReleaseLogger.txt"
#_log_filename = "C:\\CD Rip\\Tagged\\ReleaseLogger.txt"
import os
from picard import config, log
from picard.metadata import register_album_metadata_processor
from picard.metadata import register_track_metadata_processor
from picard.plugin import PluginPriority
file_to_write = config.setting["move_files_to"] + os.path.sep + _log_filename
def write_line(object_to_write):
line = "%s\n" % (object_to_write)
line = line.encode("utf8")
with open(file_to_write, "a") as f:
f.write(line)
def log_release_info(album, metadata, release):
#line_out = "Release: %s" % (release)
#write_line(line_out)
albumid = release.id
if "~aaeCredAlbumArtists" in metadata.keys():
album_artist = metadata["~aaeCredAlbumArtists"]
else:
album_artist = metadata["~albumartists"]
if "~aaeSortAlbumArtists" in metadata.keys():
album_artist_sort = metadata["~aaeSortAlbumArtists"]
else:
album_artist_sort = metadata["~albumartistsort"]
album_year = (metadata["originalyear"] + metadata["date"] + "-" * 4)[:4]
line_out = "^^^".join([
"R",
release.id,
album_year,
metadata["~primaryreleasetype"],
metadata["releasetype"],
metadata["totaldiscs"],
metadata["album"],
album_artist,
album_artist_sort,
])
write_line(line_out)
#line_out = "Release: %s" % (release)
#write_line(line_out)
for medium_info in release.medium_list[0].medium:
mNumber = medium_info.position[0].text
mFormat = medium_info.format[0].text
#if "title" in medium_info.children:
try:
mName = medium_info.title[0].text
#else:
except Exception:
mName = ""
line_out = "^^^".join([
"M",
release.id,
mNumber,
mFormat,
mName,
])
write_line(line_out)
aCount = 0
for ncredit in release.artist_credit[0].name_credit:
aID = ncredit.artist[0].id
aName = ncredit.artist[0].name[0].text
sName = ncredit.artist[0].sort_name[0].text
line_out = "^^^".join([
"A",
aID,
aName,
sName,
])
write_line(line_out)
tCount = "%d" % (aCount)
line_out = "^^^".join([
"RA",
release.id,
tCount,
aID,
])
write_line(line_out)
aCount += 1
def log_track_info(album, metadata, track, release):
#line_out = "Track: %s" % (track)
#write_line(line_out)
albumid = release.id
line_out = "^^^".join([
"T",
release.id,
metadata["musicbrainz_trackid"],
metadata["musicbrainz_recordingid"],
metadata["discnumber"],
metadata["tracknumber"],
metadata["~length"],
metadata["title"],
metadata["artist"],
])
write_line(line_out)
aCount = 0
for ncredit in track.recording[0].artist_credit[0].name_credit:
aID = ncredit.artist[0].id
aName = ncredit.artist[0].name[0].text
sName = ncredit.artist[0].sort_name[0].text
line_out = "^^^".join([
"A",
aID,
aName,
sName,
])
write_line(line_out)
tCount = "%d" % (aCount)
line_out = "^^^".join([
"TA",
metadata["musicbrainz_trackid"],
tCount,
aID,
])
write_line(line_out)
aCount += 1
# Register the plugin to run at a LOW priority so that other plugins that
# modify the contents of the _albumartists and _albumartists_sort lists can
# complete their processing and this plugin is working with the latest
# updated data.
register_album_metadata_processor(log_release_info, priority=PluginPriority.LOW)
register_track_metadata_processor(log_track_info, priority=PluginPriority.LOW)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment