Last active
March 18, 2018 03:27
-
-
Save rdswift/b65e49a47884896d9bd3409f511cbd2c to your computer and use it in GitHub Desktop.
MusicBrainz Picard 1.4.2 plugin (releaselogger.py) to dump selected CD information to a text file for later import to a database. Python script (log2db.py) to create a SQLite database and populate it from the data dump text file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python script used to create the initial (empty) SQLite database | |
file if required for the music database, and populate the database | |
from the file generated by the ReleaseLogger plugin for Picard. | |
Once processed, the source ReleaseLogger data file is appended to | |
a backup file, and the source file reset to an empty file. | |
Copyright © 2017, Bob Swift. Released under GPL-2.0 or later. | |
""" | |
import os | |
import sys | |
import time | |
import datetime | |
import sqlite3 | |
import unicodedata | |
_FILE_LOG = 'ReleaseLogger.txt' | |
_FILE_BACKUP = 'C:\\Databases\\Picard Music Database\\ReleaseLoggerProcessed.txt' | |
_FILE_DB = 'C:\\Databases\\Picard Music Database\\MusicData.db' | |
_DB_CONFIG = [ | |
"X:\\Music\\CoverArt", | |
] | |
_TABLE_SPECS = { | |
'Configuration': [ | |
('CoverArtDir', 'TEXT'), | |
], | |
'Releases': [ | |
('Selected', 'INTEGER'), | |
('ReleaseID', 'TEXT'), | |
('ReleaseYear', 'TEXT'), | |
('ReleaseTypePrimary', 'TEXT'), | |
('ReleaseTypeFull', 'TEXT'), | |
('ReleaseDiscs', 'INTEGER'), | |
('ReleaseTitle', 'TEXT'), | |
('ReleaseArtist', 'TEXT'), | |
('ReleaseArtistSort', 'TEXT'), | |
('ReleaseInCollection', 'INTEGER'), | |
('ReleaseAddedDate', 'TEXT'), | |
], | |
'ReleaseMedia': [ | |
('ReleaseID', 'TEXT'), | |
('MediaNumber', 'INTEGER'), | |
('MediaType', 'TEXT'), | |
('MediaName', 'TEXT'), | |
], | |
'Artists': [ | |
('ArtistID', 'TEXT'), | |
('ArtistName', 'TEXT'), | |
('ArtistNameSort', 'TEXT'), | |
], | |
'Tracks': [ | |
('ReleaseID', 'TEXT'), | |
('TrackID', 'TEXT'), | |
('RecordingID', 'TEXT'), | |
('DiscNumber', 'INTEGER'), | |
('TrackNumber', 'INTEGER'), | |
('TrackLength', 'TEXT'), | |
('TrackTitle', 'TEXT'), | |
('TrackArtist', 'TEXT'), | |
], | |
'ReleaseArtists': [ | |
('ReleaseID', 'TEXT'), | |
('ArtistIndex', 'INTEGER'), | |
('ArtistID', 'TEXT'), | |
], | |
'TrackArtists': [ | |
('TrackID', 'TEXT'), | |
('ArtistIndex', 'INTEGER'), | |
('ArtistID', 'TEXT'), | |
] | |
} | |
############################################################################## | |
def erase_file(file_to_erase): | |
""" | |
Delete the specified file if it exists. | |
""" | |
if os.path.isfile(file_to_erase): | |
os.remove(file_to_erase) | |
############################################################################## | |
def timestring(): | |
""" | |
Return the local time string for the current time. | |
""" | |
time_value = time.time() | |
mils = "%03d" % (int((time_value - int(time_value)) * 1000)) | |
tistr = time.strftime("%H:%M:%S", time.localtime(time_value)) | |
#return "%s.%s" % (tistr, mils) | |
return "%s" % (tistr) | |
############################################################################## | |
def ts_print(text_to_print): | |
""" | |
Prints the specified text with the current timestamp. | |
""" | |
print("%s: %s" % (timestring(), text_to_print)) | |
############################################################################## | |
def make_table(tableName): | |
""" | |
Creates the specified table within the database. | |
""" | |
temp = [] | |
for (a, b) in (_TABLE_SPECS[tableName]): | |
temp.append("%s %s" % (a, b)) | |
if tableName == "Configuration": | |
tableSpec = "" | |
else: | |
tableSpec = "ID INTEGER PRIMARY KEY AUTOINCREMENT, " | |
tableSpec = tableSpec + ", ".join(temp[:]) | |
ts_print("Creating database table: %s" % (tableName,)) | |
with sqlite3.connect(_FILE_DB) as c: | |
cursor = c.cursor() | |
cursor.execute('PRAGMA encoding="UTF-8";') | |
createCommand = "CREATE TABLE %s (%s)" % (tableName, tableSpec) | |
cursor.execute(createCommand) | |
c.commit() | |
############################################################################## | |
def make_database(): | |
""" | |
Creates the SQLite database for the music data. | |
""" | |
ts_print("Creating database file: %s" % (_FILE_DB,)) | |
for table_name in _TABLE_SPECS.keys(): | |
make_table(table_name) | |
ts_print("Databse file creation complete.") | |
initialize_database_configuration() | |
############################################################################## | |
def initialize_database_configuration(): | |
""" | |
Initializes the configuration for the SQLite music database. | |
""" | |
if len(_DB_CONFIG) == len(_TABLE_SPECS["Configuration"]): | |
ts_print("Initializing database configuration.") | |
insertItem("Configuration", _DB_CONFIG) | |
else: | |
ts_print("ERROR! Invalid Configuration table information.") | |
sys.exit(3) | |
ts_print("Databse configuration initialized.") | |
############################################################################## | |
def checkItem(tableName, checkList, checkItems): | |
""" | |
Checks if an item already exists in the SQLite music database. | |
""" | |
if os.path.isfile(_FILE_DB): | |
if tableName in _TABLE_SPECS.keys(): | |
with sqlite3.connect(_FILE_DB) as c: | |
cursor = c.cursor() | |
cursor.execute('PRAGMA encoding="UTF-8";') | |
createCommand = "SELECT COUNT(*) FROM %s WHERE %s" % (tableName, checkList,) | |
# print("") | |
# print(createCommand) | |
# print(checkItems[:]) | |
cursor.execute(createCommand, (checkItems[:])) | |
return int("%s" % cursor.fetchone()) | |
else: | |
ts_print("ERROR! Table '%s' not found." % (tableName,)) | |
sys.exit(2) | |
else: | |
ts_print("ERROR! Database not found.") | |
sys.exit(1) | |
############################################################################## | |
def insertItem(tableName, parts): | |
""" | |
Inserts a new item in the SQLite music database. | |
""" | |
if os.path.isfile(_FILE_DB): | |
if tableName in _TABLE_SPECS.keys(): | |
cCount = len(_TABLE_SPECS[tableName]) | |
if cCount: | |
tItems = [] | |
qMarks = ", ".join("?" * cCount) | |
for (tItem, tSpec) in _TABLE_SPECS[tableName]: | |
tItems.append(tItem) | |
cmdItems = ", ".join(tItems[:]) | |
with sqlite3.connect(_FILE_DB) as c: | |
cursor = c.cursor() | |
cursor.execute('PRAGMA encoding="UTF-8";') | |
createCommand = "INSERT INTO %s (%s) VALUES (%s)" % (tableName, cmdItems, qMarks) | |
cursor.execute(createCommand, (parts[:])) | |
c.commit() | |
else: | |
ts_print("ERROR! Table '%s' not found." % (tableName,)) | |
sys.exit(2) | |
else: | |
ts_print("ERROR! Database not found.") | |
sys.exit(1) | |
############################################################################## | |
def main(): | |
""" | |
Main function of the module. | |
Creates a new SQLite music database, if necessary, and processes the | |
file generated by the ReleaseLogger plugin for Picarddatabase to populate | |
the database. | |
""" | |
print("\n") | |
ts_print("Checking for database file.") | |
if not os.path.isfile(_FILE_DB): | |
make_database() | |
else: | |
ts_print("Database file found.") | |
ts_print("Checking for ReleaseLogger log file.") | |
if not os.path.isfile(_FILE_LOG): | |
ts_print("ERROR! ReleaseLogger log file not found.") | |
sys.exit(4) | |
lCount = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ] | |
# Offset: 0 = Lines Read | |
# 1 = Unknown Lines | |
# 2 = Blank Lines | |
# 3 = Releases Existing | |
# 4 = Releases Added | |
# 5 = Artists Existing | |
# 6 = Artists Added | |
# 7 = Tracks Existing | |
# 8 = Tracks Added | |
# 9 = ReleaseArtists Existing | |
# 10 = ReleaseArtists Added | |
# 11 = TrackArtists Existing | |
# 12 = TrackArtists Added | |
# 13 = Media Existing | |
# 14 = Media Added | |
# 15 = unused | |
# 16 = unused | |
# 17 = unused | |
# 18 = unused | |
# 19 = unused | |
ts_print("Processing the ReleaseLogger log file.") | |
with open(_FILE_LOG, encoding="utf8") as iFile: | |
for line in iFile: | |
lCount[0] += 1 | |
lCountText = format(lCount[0], ",d") | |
print(" Line: %10s\r" % (lCountText), end="", flush=True) | |
parts = line.strip().split("^^^") | |
if len(parts): | |
#time.sleep(.2) | |
lineType = parts[0] | |
parts = parts[1:] | |
tableName = "Unknown" | |
checkItems = [] | |
if lineType.upper() == "R": | |
parts.append(0,) | |
tableName = "Releases" | |
checkList = "ReleaseID=?" | |
checkItems.append(parts[0]) | |
cOffset = 3 | |
parts.insert(0, 0) | |
elif lineType.upper() == "A": | |
tableName = "Artists" | |
checkList = "ArtistID=?" | |
checkItems.append(parts[0]) | |
cOffset = 5 | |
elif lineType.upper() == "T": | |
tableName = "Tracks" | |
checkList = "TrackID=?" | |
checkItems.append(parts[1]) | |
cOffset = 7 | |
elif lineType.upper() == "RA": | |
tableName = "ReleaseArtists" | |
checkList = "ReleaseID=? AND ArtistID=?" | |
checkItems.append(parts[0]) | |
checkItems.append(parts[2]) | |
cOffset = 9 | |
elif lineType.upper() == "TA": | |
tableName = "TrackArtists" | |
checkList = "TrackID=? AND ArtistID=?" | |
checkItems.append(parts[0]) | |
checkItems.append(parts[2]) | |
cOffset = 11 | |
elif lineType.upper() == "M": | |
tableName = "ReleaseMedia" | |
checkList = "ReleaseID=? AND MediaNumber=?" | |
checkItems.append(parts[0]) | |
checkItems.append(parts[1]) | |
cOffset = 13 | |
else: | |
lCount[1] += 1 | |
if tableName in _TABLE_SPECS.keys(): | |
if checkItem(tableName, checkList, checkItems): | |
lCount[cOffset] += 1 | |
else: | |
insertItem(tableName, parts) | |
lCount[cOffset + 1] += 1 | |
else: | |
lCount[2] += 1 | |
ts_print("Data import complete.") | |
ts_print("Backing up the ReleaseLogger log file.") | |
with open(_FILE_BACKUP, "a", encoding="utf8") as oFile: | |
with open(_FILE_LOG, encoding="utf8") as iFile: | |
oFile.write(iFile.read()) | |
with open(_FILE_LOG, "w") as iFile: | |
pass | |
ts_print("Log file moved to processed backup file.") | |
a = format(lCount[0], ",d") | |
b = format(lCount[2], ",d") | |
c = format(lCount[1], ",d") | |
print("\nLines Processed: %s (%s blank, %s unknown)" % (a, b, c,)) | |
a = format(lCount[4], ",d") | |
b = format(lCount[3], ",d") | |
print("Releases Added: %s (%s existing)" % (a, b,)) | |
a = format(lCount[14], ",d") | |
b = format(lCount[13], ",d") | |
print("Release Media Added: %s (%s existing)" % (a, b,)) | |
a = format(lCount[6], ",d") | |
b = format(lCount[5], ",d") | |
print("Artists Added: %s (%s existing)" % (a, b,)) | |
a = format(lCount[8], ",d") | |
b = format(lCount[7], ",d") | |
print("Tracks Added: %s (%s existing)" % (a, b,)) | |
a = format(lCount[10], ",d") | |
b = format(lCount[9], ",d") | |
print("Release Artists Added: %s (%s existing)" % (a, b,)) | |
a = format(lCount[12], ",d") | |
b = format(lCount[11], ",d") | |
print("Track Artists Added: %s (%s existing)" % (a, b,)) | |
print("") | |
############################################################################## | |
if __name__ == '__main__': | |
main() | |
############################################################################## |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PLUGIN_NAME = 'Release Logger' | |
PLUGIN_AUTHOR = 'Bob Swift (rdswift)' | |
PLUGIN_DESCRIPTION = ''' | |
This plugin saves information regarding any loaded releases to a log file. | |
By default, the file is called 'ReleaseLogger.txt' and is located in the | |
file naming destination directoiry. | |
<br /><br /> | |
The information is stored in the following lines: | |
<ul> | |
<li>A: Artist Information (Artist ID, Name, Sort Name) | |
<li>R: Release Information (Release ID, Year, Primary Release Type, Full | |
Release Type, Total Discs, Album Title, Album Artist, Album Artist Sort) | |
<li>T: Track Information (Release ID, Track ID, Recording ID, Disc Number, | |
Track Number, Track Length, Track Title, Track Artist) | |
<li>RA: Release Artist Information (Release ID, Artist List Order Number, | |
Artist ID) | |
<li>TA: Track Artist Information (Track ID, Artist List Order Number, | |
Artist ID) | |
</ul> | |
''' | |
PLUGIN_VERSION = "0.1" | |
PLUGIN_API_VERSIONS = ["1.4"] | |
PLUGIN_LICENSE = "GPL-2.0 or later" | |
PLUGIN_LICENSE_URL = "https://www.gnu.org/licenses/gpl-2.0.html" | |
_log_filename = "\\ReleaseLogger.txt" | |
#_log_filename = "C:\\CD Rip\\Tagged\\ReleaseLogger.txt" | |
import os | |
from picard import config, log | |
from picard.metadata import register_album_metadata_processor | |
from picard.metadata import register_track_metadata_processor | |
from picard.plugin import PluginPriority | |
file_to_write = config.setting["move_files_to"] + os.path.sep + _log_filename | |
def write_line(object_to_write): | |
line = "%s\n" % (object_to_write) | |
line = line.encode("utf8") | |
with open(file_to_write, "a") as f: | |
f.write(line) | |
def log_release_info(album, metadata, release): | |
#line_out = "Release: %s" % (release) | |
#write_line(line_out) | |
albumid = release.id | |
if "~aaeCredAlbumArtists" in metadata.keys(): | |
album_artist = metadata["~aaeCredAlbumArtists"] | |
else: | |
album_artist = metadata["~albumartists"] | |
if "~aaeSortAlbumArtists" in metadata.keys(): | |
album_artist_sort = metadata["~aaeSortAlbumArtists"] | |
else: | |
album_artist_sort = metadata["~albumartistsort"] | |
album_year = (metadata["originalyear"] + metadata["date"] + "-" * 4)[:4] | |
line_out = "^^^".join([ | |
"R", | |
release.id, | |
album_year, | |
metadata["~primaryreleasetype"], | |
metadata["releasetype"], | |
metadata["totaldiscs"], | |
metadata["album"], | |
album_artist, | |
album_artist_sort, | |
]) | |
write_line(line_out) | |
#line_out = "Release: %s" % (release) | |
#write_line(line_out) | |
for medium_info in release.medium_list[0].medium: | |
mNumber = medium_info.position[0].text | |
mFormat = medium_info.format[0].text | |
#if "title" in medium_info.children: | |
try: | |
mName = medium_info.title[0].text | |
#else: | |
except Exception: | |
mName = "" | |
line_out = "^^^".join([ | |
"M", | |
release.id, | |
mNumber, | |
mFormat, | |
mName, | |
]) | |
write_line(line_out) | |
aCount = 0 | |
for ncredit in release.artist_credit[0].name_credit: | |
aID = ncredit.artist[0].id | |
aName = ncredit.artist[0].name[0].text | |
sName = ncredit.artist[0].sort_name[0].text | |
line_out = "^^^".join([ | |
"A", | |
aID, | |
aName, | |
sName, | |
]) | |
write_line(line_out) | |
tCount = "%d" % (aCount) | |
line_out = "^^^".join([ | |
"RA", | |
release.id, | |
tCount, | |
aID, | |
]) | |
write_line(line_out) | |
aCount += 1 | |
def log_track_info(album, metadata, track, release): | |
#line_out = "Track: %s" % (track) | |
#write_line(line_out) | |
albumid = release.id | |
line_out = "^^^".join([ | |
"T", | |
release.id, | |
metadata["musicbrainz_trackid"], | |
metadata["musicbrainz_recordingid"], | |
metadata["discnumber"], | |
metadata["tracknumber"], | |
metadata["~length"], | |
metadata["title"], | |
metadata["artist"], | |
]) | |
write_line(line_out) | |
aCount = 0 | |
for ncredit in track.recording[0].artist_credit[0].name_credit: | |
aID = ncredit.artist[0].id | |
aName = ncredit.artist[0].name[0].text | |
sName = ncredit.artist[0].sort_name[0].text | |
line_out = "^^^".join([ | |
"A", | |
aID, | |
aName, | |
sName, | |
]) | |
write_line(line_out) | |
tCount = "%d" % (aCount) | |
line_out = "^^^".join([ | |
"TA", | |
metadata["musicbrainz_trackid"], | |
tCount, | |
aID, | |
]) | |
write_line(line_out) | |
aCount += 1 | |
# Register the plugin to run at a LOW priority so that other plugins that | |
# modify the contents of the _albumartists and _albumartists_sort lists can | |
# complete their processing and this plugin is working with the latest | |
# updated data. | |
register_album_metadata_processor(log_release_info, priority=PluginPriority.LOW) | |
register_track_metadata_processor(log_track_info, priority=PluginPriority.LOW) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment