Last active
February 17, 2021 00:33
-
-
Save ocshawn/8957185a978bb3ea70df590dbceb6cd4 to your computer and use it in GitHub Desktop.
Add Google Play Music Takeout playlists to plex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Python version 3 | |
# Based off of LucidBrot migrate-from-google-play-music and gmusic_playlists_to_plex by Author: Blacktwin, pjft, sdlynx | |
# Author of this version: ocShawn | |
import os, sys, csv, html, requests, re | |
requests.packages.urllib3.disable_warnings() | |
from dataclasses import dataclass | |
from datetime import datetime | |
from plexapi.server import PlexServer, CONFIG | |
""" | |
README | |
Purpose: To allow you to add Google Play Music Takeout playlists to plex | |
1) First get your Google Play Music playlists out of Takeout at https://takeout.google.com/ | |
Requires: plexapi, requests | |
2) Install requirements Run: `python -m pip install --upgrade pip plexapi requests` | |
3) Change below: | |
PLAYLISTS_PATH = path to google takeout Playlists folder | |
PLEX_URL = url to plex server | |
PLEX_TOKEN = See https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/ for how to get | |
MUSIC_LIBRARY_NAME = name of plex music library | |
IGNORE_MUSIC_FOLDERS = folders you want to ignore in PLAYLISTS_PATH | |
4) Run Script: `python google_playlists_to_plex.py > out.txt` | |
5) Manually add missed songs in plex | |
After script runs view out.txt file towards the bottom missed tracks will be listed | |
6) Done, PLEX party time | |
""" | |
# Note that path settings are relative to the current working directory if you don't specify absolute paths. | |
# Path to "Takeout / Google Play Music / Playlists" as obtained from takeout.google.com | |
PLAYLISTS_PATH = os.path.normpath('E:\Takeout\Google Play Music\Playlists') | |
# plex server info | |
PLEX_URL = 'http://192.168.0.100:32400/' | |
PLEX_TOKEN = '' | |
MUSIC_LIBRARY_NAME = 'Music' | |
# Folders to ignore | |
IGNORE_MUSIC_FOLDERS=['@eaDir'] | |
# SKIP THUMBS UP playlist set to (True / False) | |
SKIP_THUMBSUP = False | |
## CODE BELOW ## | |
if not PLEX_URL: | |
PLEX_URL = CONFIG.data['auth'].get('server_baseurl') | |
if not PLEX_TOKEN: | |
PLEX_TOKEN = CONFIG.data['auth'].get('server_token') | |
# Connect to Plex Server | |
sess = requests.Session() | |
sess.verify = False | |
plex = PlexServer(PLEX_URL, PLEX_TOKEN, session=sess) | |
PLEX_MUSIC_LIBRARY = plex.library.section(MUSIC_LIBRARY_NAME) | |
@dataclass() | |
class Playlist: | |
""" | |
After collecting, the data must be SOMEWHERE. Here. | |
""" | |
name: str | |
content: list = None | |
def filter_playlists(subfolders): | |
""" | |
Returns folder paths which look like a valid playlist export | |
""" | |
for folder in subfolders: | |
valid=True | |
# check for existence of Metadata.csv | |
if not os.path.isfile(os.path.join(folder, 'Metadata.csv')): | |
valid=False | |
if not os.path.isdir(os.path.join(folder, 'Tracks')): | |
valid=False | |
if not valid: | |
if os.path.split(folder)[1] != "Thumbs Up": | |
print("\tInvalid: {}".format(folder), file=sys.stderr) | |
if valid: | |
yield(folder) | |
@dataclass(frozen=True) | |
class SongInfo: | |
""" | |
Basically a Named Tuple for storing info of a song | |
""" | |
title: str | |
artist: str | |
liked: bool | |
album: str | |
duration_ms: str | |
def read_gpm_playlist(playlistdir, trackdir='Tracks'): | |
""" | |
Returns a list of song names contained in a GPM Takeout Playlist | |
""" | |
song_infos_unsorted = [] | |
tracks_path = os.path.join(playlistdir, trackdir) | |
# Expected contents of that directory is one file per song, | |
# each file csv-formatted and containing something like this: | |
# | |
# Title,Album,Artist,Duration (ms),Rating,Play Count,Removed,Playlist Index | |
# "The Show","Lenka","Lenka","235728","5","24","","4" | |
# <newline> | |
song_csvs = [ f.path for f in os.scandir(tracks_path) if f.is_file() ] | |
for song_csv in song_csvs: | |
try: | |
with open(song_csv, encoding="utf-8") as csvfile: | |
rows = csv.reader(csvfile) | |
for title, album, artist, duration_ms, rating, play_count, removed, playlist_index in rows: | |
if (title.strip() == 'Title') and (artist.strip() == 'Artist') and (album.strip() == 'Album'): | |
# skip headline | |
continue | |
if removed: | |
# skip removed | |
continue | |
# clean up google's htmlencoding mess | |
title = html.unescape(title) | |
album = html.unescape(album) | |
artist = html.unescape(artist) | |
print("Reading GPM {} by {}.".format(title.encode('utf-8'), artist.encode('utf-8'))) | |
song_info = SongInfo(title= title, album= album, artist= artist, liked= (rating == '5'), duration_ms= duration_ms) | |
song_infos_unsorted.append((song_info, playlist_index)) | |
except UnicodeEncodeError as e: | |
print("INFO: Skipping file {} due to Unicode Reading Error.".format(song_csv.encode('utf-8'))) | |
# sort playlist by index | |
song_infos_sorted = sorted(song_infos_unsorted, key=lambda x: x[1]) | |
return [song_tuple[0] for song_tuple in song_infos_sorted] | |
def folders_of_path(folderpath): | |
return os.path.normpath(folderpath).split(os.sep) | |
def is_ignored(folder): | |
path = os.path.normpath(folder) | |
folders= folders_of_path(path) | |
return any([item in folders for item in IGNORE_MUSIC_FOLDERS]) | |
def round_down(num, divisor): | |
""" | |
Parameters | |
---------- | |
num (int,str): Number to round down | |
divisor (int): Rounding digit | |
Returns | |
------- | |
Rounded down int | |
""" | |
num = int(num) | |
return num - (num%divisor) | |
def compare(ggmusic, pmusic): | |
""" | |
Parameters | |
---------- | |
ggmusic (dict): Contains track data from Google Music | |
pmusic (object): Plex item found from search | |
Returns | |
------- | |
pmusic (object): Matched Plex item | |
""" | |
title = str(ggmusic.title.encode('ascii', 'ignore')) | |
duration = int(ggmusic.duration_ms) | |
# Check if track duration match | |
if isinstance(duration, int) and isinstance(pmusic.duration, int): | |
if round_down(duration, 1000) == round_down(pmusic.duration, 1000): | |
return [pmusic] | |
# Lastly, check if title matches | |
if title == pmusic.title: | |
return [pmusic] | |
# check if title matches raw | |
elif ggmusic.title == pmusic.title: | |
return [pmusic] | |
# check if title matches raw trimmed caseless all special carecters removed | |
elif re.sub('[^A-Za-z0-9]+', '', ggmusic.title).lower() == re.sub('[^A-Za-z0-9]+', '', pmusic.title).lower(): | |
return [pmusic] | |
def main(): | |
startTime=datetime.now() | |
print("Considering any playlists in {}".format(PLAYLISTS_PATH)) | |
print("Collecting playlist directories...\n") | |
print("Collecting playlist directories...\n", file=sys.stderr) | |
subfolders = [ f.path for f in os.scandir(PLAYLISTS_PATH) if f.is_dir() and not is_ignored(f.path) ] | |
playlists = list(filter_playlists(subfolders)) | |
for playlistpath in playlists: | |
playlistname = os.path.basename(playlistpath) | |
print("\tPlaylist: {}".format(playlistname)) | |
print("\tPlaylist: {}".format(playlistname), file=sys.stderr) | |
print("Indexing fallback...") | |
fallback_music_file_infos = [] | |
fallback_music_files=[] | |
output_playlists = [] # List of Playlist objects | |
print("Accumulating Contents...") | |
# hackaround for Thumbs up Playlist: add it and handle it separately | |
THUMBSUPHACK="thumbsuphack1234542323232321231233333$2" | |
if not SKIP_THUMBSUP: | |
playlists.append(THUMBSUPHACK) | |
for playlistpath in playlists: | |
if playlistpath != THUMBSUPHACK: | |
playlistname = os.path.basename(playlistpath) | |
print("Accumulating Contents for Playlist {}".format(playlistname)) | |
song_info_list_sorted = read_gpm_playlist(playlistpath) | |
else: | |
playlistname = "Thumbs up" | |
print("Accumulating Contents for Playlist {}".format(playlistname)) | |
song_info_list_sorted = read_gpm_playlist(PLAYLISTS_PATH, trackdir="Thumbs up") | |
song_path_list = [] | |
# instantiate playlist object for later use | |
playlist=Playlist(name=playlistname, content=song_info_list_sorted) | |
output_playlists.append(playlist) | |
print("Done getting playlists from files RunTime: {}".format(datetime.now() - startTime)) | |
print("Done getting playlists from files RunTime: {}".format(datetime.now() - startTime), file=sys.stderr) | |
# print(output_playlists[0].name) | |
# DONE? replace get_all_user_playlist_contents | |
for pl in output_playlists: | |
playlistName = pl.name | |
# Check for existing Plex Playlists, skip if exists | |
if playlistName in [x.title for x in plex.playlists()]: | |
print("Playlist: ({}) already available, skipping...".format(playlistName)) | |
else: | |
playlistContent = [] | |
print("Starting: {} Tracks: {}".format(playlistName, len(pl.content)), file=sys.stderr) | |
print("Adding Playlist: {}".format(playlistName)) | |
# Go through tracks in Google Music Playlist | |
for ggmusicTrackInfo in pl.content: | |
if len(playlistContent) % 50 == 0: | |
print("Remaining:{}".format(len(pl.content) - len(playlistContent)), file=sys.stderr) | |
trackFound = False | |
ggmusic = ggmusicTrackInfo | |
title = str(ggmusic.title) | |
album = str(ggmusic.album) | |
artist = str(ggmusic.artist) | |
# Search Plex for Album title and Track title | |
albumTrackSearch = PLEX_MUSIC_LIBRARY.searchTracks( | |
**{'album.title': album, 'track.title': title}) | |
# Check results | |
if len(albumTrackSearch) == 1: | |
playlistContent += albumTrackSearch | |
trackFound = True | |
if len(albumTrackSearch) > 1: | |
for pmusic in albumTrackSearch: | |
albumTrackFound = compare(ggmusic, pmusic) | |
if albumTrackFound: | |
playlistContent += albumTrackFound | |
trackFound = True | |
break | |
# Nothing found from Album title and Track title | |
if not trackFound: | |
# Search Plex for Track title | |
trackSearch = PLEX_MUSIC_LIBRARY.searchTracks( | |
**{'track.title': title}) | |
if len(trackSearch) == 1: | |
playlistContent += trackSearch | |
trackFound = True | |
if len(trackSearch) > 1: | |
for pmusic in trackSearch: | |
trackFound = compare(ggmusic, pmusic) | |
if trackFound: | |
playlistContent += trackFound | |
trackFound = True | |
break | |
# Nothing found from Track title | |
if not trackFound: | |
# Search Plex for Artist | |
artistSearch = PLEX_MUSIC_LIBRARY.searchTracks( | |
**{'artist.title': artist}) | |
for pmusic in artistSearch: | |
artistFound = compare(ggmusic, pmusic) | |
if artistFound: | |
playlistContent += artistFound | |
trackFound = True | |
break | |
# last shot to find a match | |
if not trackFound: | |
# Use Artist search | |
for pmusic in artistSearch: | |
toMatchA = re.sub('[^A-Za-z0-9]+', '', title).lower() | |
toMatchB = re.sub('[^A-Za-z0-9]+', '', pmusic.title).lower() | |
if toMatchA in toMatchB or toMatchB in toMatchA: | |
playlistContent += [pmusic] | |
trackFound = True | |
break | |
if not trackFound: | |
print(u"Could not find in Plex:\n\t{} - {} - {}".format(artist.encode('utf-8'), album.encode('utf-8'), title.encode('utf-8'))) | |
if len(playlistContent) != 0: | |
print("Google Music Playlist: {}, has {} tracks. {} tracks were added to Plex.".format( | |
playlistName, len(pl.content), len(playlistContent))) | |
plex.createPlaylist(playlistName, playlistContent) | |
print("Google Music Playlist: {}, has {} tracks. {} tracks were added to Plex.".format( | |
playlistName, len(pl.content), len(playlistContent)), file=sys.stderr) | |
else: | |
print("Could not find any matching tracks in Plex for {}".format(playlistName)) | |
print("Total RunTime: {}".format(datetime.now() - startTime)) | |
print("Total RunTime: {}".format(datetime.now() - startTime), file=sys.stderr) | |
if __name__ == '__main__': | |
if len(sys.argv) > 1: | |
if sys.argv[1] == 'help': | |
print("hello. Specify some things in the source file with the CAPS LOCKED variables!") | |
print("If you're running this in Windows CMD, you might need to `set PYTHONIOENCODING=utf-8` first.") | |
print("It is probably advisable to pipe the stdout into a file so that the important messages from STDERR surface clearly.") | |
exit(0) | |
# always: | |
main() | |
print("Done", file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment