-
-
Save mikez/eca572f1b6d8d658e0b33d2001d23a9b to your computer and use it in GitHub Desktop.
Experimental Spotify folders with LevelDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Get your Spotify folder hierarchy with playlists into JSON. | |
:license: MIT, see LICENSE for more details. | |
""" | |
from __future__ import print_function | |
import argparse | |
import json | |
import os | |
import re | |
import sys | |
try: | |
from urllib import unquote_plus # Python 2 | |
except ImportError: | |
from urllib.parse import unquote_plus # Python 3 | |
if sys.platform == "darwin": | |
# Mac | |
PERSISTENT_CACHE_PATH = ( | |
"~/Library/Application Support/Spotify/PersistentCache/Users" | |
) | |
elif sys.platform == "win32": | |
# Windows, via Microsoft store or standalone | |
windows_appdata_path = os.getenv("LOCALAPPDATA") | |
windows_store_path = os.path.join( | |
windows_appdata_path, | |
"Packages\\SpotifyAB.SpotifyMusic_zpdnekdrzrea0\\LocalState" "\\Spotify\\Users", | |
) | |
if os.path.exists(windows_store_path): | |
PERSISTENT_CACHE_PATH = windows_store_path | |
else: | |
PERSISTENT_CACHE_PATH = os.path.join(windows_appdata_path, "Spotify\\Users") | |
else: | |
# Linux | |
PERSISTENT_CACHE_PATH = os.path.join( | |
os.getenv("XDG_CACHE_HOME", "~/.cache"), "spotify/Users" | |
) | |
def parse(data, user_id): | |
""" | |
Parse a Spotify PersistentStorage file with folder structure at start. | |
`file_name` | |
Location of a PersistentStorage file. | |
`user_id` | |
Specify a user id to use for folder URIs. Can also be a | |
placeholder value like 'unknown'. (Background: this information | |
doesn't seem to be provided in the source file.) | |
FILE STRUCTURE | |
-------------- | |
The file resembles a binary Protocol Buffers file with some twists. | |
Its current structure seems to be as follows: | |
1. `00` hexstring. | |
2. Spotify version number. Encoded as varint. | |
(E.g. `114800625` is version `1.1.48.625`.) | |
3. `A40115` hexstring with unknown meaning. | |
4. Number of {playlist, start-group, end-group} strings. | |
Encoded as varint of `(number << 3) | 001`. | |
5. List of any of these three playlist string types: | |
- playlist identifier | |
(e.g. "spotify:playlist:37i9dQZF1DXdCsscAsbRNz") | |
- folder start identifier | |
(e.g. "spotify:start-group:8212237ac7347bfe:Summer") | |
- folder end identifier | |
(e.g. "spotify:end-group:8212237ac7347bfe") | |
6. Other content we currently ignore. | |
""" | |
# with open(file_name, "rb") as data_file: | |
# data = data_file.read() | |
# spotify:playlist, spotify:start-group, spotify:end-group | |
rows = re.split(rb"spotify:(?=[pse])", data) | |
folder = {"type": "folder", "children": []} | |
stack = [] | |
for index, row in enumerate(rows): | |
# Note: '\x10' marks the end of the entire list. This might | |
# break in future versions of Spotify. Here are two alternative | |
# solutions one might consider then: | |
# 1. Read the length encoded as a varint before each string. | |
# 2. Read the number of repeats specified in the beginning of | |
# the file. | |
chunks = row.split(b"\x12", 1) | |
row = chunks[0] | |
if row.startswith(b"playlist:"): | |
folder["children"].append( | |
{"type": "playlist", "uri": "spotify:" + row.decode("utf-8")} | |
) | |
elif row.startswith(b"start-group:"): | |
stack.append(folder) | |
tags = row.split(b":") | |
folder = dict( | |
# Assuming folder names < 128 characters. | |
# Alternatively, do a protobuf varint parser to get length. | |
name=unquote_plus(tags[-1].decode("utf-8")), | |
type="folder", | |
uri=( | |
"spotify:user:%s:folder:" % user_id | |
+ tags[-2].decode("utf-8").zfill(16) | |
), | |
children=[], | |
) | |
elif row.startswith(b"end-group:"): | |
parent = stack.pop() | |
parent["children"].append(folder) | |
folder = parent | |
# if folder.get("children") and len(chunks) > 1: | |
# break | |
# close any remaining groups -- sometimes a file contains errors. | |
while len(stack) > 0: | |
parent = stack.pop() | |
parent["children"].append(folder) | |
folder = parent | |
return folder | |
def get_all_persistent_cache_files(path): | |
"""Get all files in PersistentCache storage with "start-group" marker.""" | |
result = [] | |
path = os.path.expanduser(path) | |
for dirpath, dirnames, filenames in os.walk(path): | |
for filename in filenames: | |
filepath = os.path.join(dirpath, filename) | |
if find_in_file(b"start-group", filepath): | |
result.append(filepath) | |
return result | |
def get_folder(folder_id, data): | |
"""Get a specific folder in data output by `parse()`.""" | |
data_type = data.get("type") | |
if data_type == "folder": | |
if data.get("uri", "").endswith(folder_id): | |
return data | |
for child in data.get("children", []): | |
folder = get_folder(folder_id, child) | |
if folder: | |
return folder | |
def find_in_file(string, filepath): | |
"""Check if a file contains the given string.""" | |
try: | |
with open(filepath, mode="rb") as f: | |
for line in f: | |
if string in line: | |
return True | |
except (OSError, IOError): | |
return False | |
return False | |
def print_info_text(number): | |
"""Prints info text for `number` of PersistentCache storage files.""" | |
suffix = "y" if number == 1 else "ies" | |
message = "Found {number} folder hierarch{suffix} on this machine.".format( | |
number=number, suffix=suffix | |
) | |
if number > 1: | |
message += ( | |
"\n\n" "To see the second one, run" "\n\n" " spotifyfolders --account 2\n" | |
) | |
print(message) | |
def _process(raw_data, args, user_id="unknown"): | |
# preprocessing | |
if args.folder: | |
uri = args.folder | |
if "/" not in uri and ":" not in uri: | |
print("Specify folder as a URL or Spotify URI. See `--help`.") | |
sys.exit(2) | |
separator = "/" if uri.find("/") > 0 else ":" | |
user_id = uri.split(separator)[-3] | |
folder_id = uri.split(separator)[-1] | |
data = parse(raw_data, user_id=user_id) | |
# postprocessing | |
if args.folder: | |
data = get_folder(folder_id, data) | |
if not data: | |
print("Folder not found :(") | |
sys.exit(1) | |
return json.dumps(data) | |
# -------------------------------------------------- | |
# Bare-bones LevelDB parser | |
# see: https://github.com/google/leveldb/ | |
# -------------------------------------------------- | |
from dataclasses import dataclass | |
import snappy | |
BLOCK_TRAILER_SIZE = 5 # kBlockTrailerSize | |
def read_leveldb_key_in_directory(target_key_prefix, path): | |
# target_key_prefix = b"!pl#slc#\x1dspotify:user:" | |
path = os.path.expanduser(path) | |
for dirpath, dirnames, filenames in os.walk(path): | |
filepaths = [os.path.join(dirpath, filename) for filename in filenames] | |
for filepath in sorted(filepaths, key=modified, reverse=True): | |
try: | |
if filepath.endswith(".ldb"): | |
yield from read_leveldb_key_in_ldb_file(target_key_prefix, filepath) | |
elif filepath.endswith(".log"): | |
values = list( | |
read_leveldb_key_in_log_file(target_key_prefix, filepath) | |
) | |
if values: | |
# get the last update | |
yield values[-1] | |
else: | |
continue | |
except (ValueError, OSError, IOError): | |
continue | |
def read_leveldb_key_in_log_file(target_key_prefix, filepath): | |
# print("LOG") | |
# !pl#slc#\x1dspotify:user:XXXXX:rootlist# | |
with open(filepath, "rb") as file: | |
offset = 0 | |
while True: | |
crc = file.read(4) | |
if not crc: | |
return | |
length, _ = read_int(file.read(2)) | |
block_type_raw = file.read(1)[0] | |
try: | |
block_type = {1: "full", 2: "first", 3: "middle", 4: "last"}[ | |
block_type_raw | |
] | |
except KeyError: | |
return | |
offset += 4 + 2 + 1 | |
file.seek(offset) | |
if block_type == "first" or block_type == "full": | |
data = file.read(length) | |
elif block_type == "middle" or block_type == "last": | |
data += file.read(length) | |
if block_type in ("full", "last"): | |
for operation, args in Batch(data): | |
if not operation == "put": | |
continue | |
key, value = args | |
if key.startswith(target_key_prefix): | |
# print("FOUND", filepath) | |
yield value | |
offset += length | |
file.seek(offset) | |
raise ValueError("Key not found.") | |
def read_leveldb_key_in_ldb_file(target_key_prefix, filepath): | |
# print("LDB") | |
# !pl#slc#\x1dspotify:user:XXXXX:rootlist# | |
with open(filepath, "rb") as file: | |
# look in footer | |
metaindex_handle, index_handle = read_footer(file) | |
# look in index | |
for key, data_handle in Block(file, index_handle, values_are_handles=True): | |
if bytearray_less_or_equal(target_key_prefix, key): | |
break | |
else: | |
raise ValueError("Key not found.") | |
# look in data block | |
for key, value in Block(file, data_handle): | |
if key.startswith(target_key_prefix): | |
# print("FOUND", filepath) | |
yield value | |
return | |
else: | |
raise ValueError("Key not found.") | |
class Batch: | |
""" | |
https://github.com/google/leveldb/blob/main/db/write_batch.cc | |
WriteBatch::rep_ := | |
sequence: fixed64 | |
count: fixed32 | |
data: record[count] | |
record := | |
kTypeValue varstring varstring | | |
kTypeDeletion varstring | |
varstring := | |
len: varint32 | |
data: uint8[len] | |
""" | |
def __init__(self, data): | |
self.data = data | |
self.sequence, offset = read_int(data, offset=0, length=8) | |
self.count, self.start_offset = read_int(data, offset, length=4) | |
def __iter__(self): | |
data = self.data | |
offset = self.start_offset | |
for i in range(self.count): | |
operation, offset = read_int(data, offset, length=1) | |
length, offset = read_varint(data, offset) | |
key, offset = read_n_bytes(data, offset, length) | |
if operation == 1: # operation == value | |
length, offset = read_varint(data, offset) | |
value, offset = read_n_bytes(data, offset, length) | |
yield "put", (key, value) | |
else: | |
yield "delete", (key,) | |
@dataclass | |
class BlockHandle: | |
offset: int | |
size: int | |
class Block: | |
def __init__(self, file, handle: BlockHandle, values_are_handles=False): | |
self.handle = handle | |
file.seek(handle.offset) | |
self.raw_data = file.read(handle.size) | |
self.compression = file.read(1) | |
self.crc = file.read(BLOCK_TRAILER_SIZE - 1) | |
self.values_are_handles = values_are_handles | |
match self.compression: | |
case b"\x00": | |
self.data = self.raw_data | |
case b"\x01": | |
self.data = snappy.uncompress(self.raw_data) | |
case b"\x02": | |
raise ValueError("Zstandard decompression not implemented") | |
case _: | |
raise ValueError("Unknown compression {self.compression!r}") | |
def __iter__(self): | |
data = self.data | |
# restart points | |
restarts_count = int.from_bytes(data[-4:], byteorder="little") | |
restarts_suffix_length = restarts_count * 4 + 4 | |
# keys and values | |
offset = 0 | |
index = 0 | |
shared_key_prefix = b"" | |
while offset < len(data) - restarts_suffix_length: | |
index += 1 | |
(shared, unshared, value_length), offset = read_n_varints(data, offset, 3) | |
# key | |
fullkey = data[offset : offset + unshared] | |
key = shared_key_prefix[:shared] + fullkey[:-8] | |
sequence_number = int.from_bytes(fullkey[-8:], byteorder="little") | |
operation_type = sequence_number & 0xFF | |
sequence_number >>= 8 | |
offset += unshared | |
# value | |
valuedata = data[offset : offset + value_length] | |
if self.values_are_handles: | |
(block_offset, block_size), _ = read_n_varints(valuedata, 0, 2) | |
yield key, BlockHandle(block_offset, block_size) | |
else: | |
yield key, valuedata | |
# prepare for next round | |
if shared == 0: | |
shared_key_prefix = key | |
offset += value_length | |
def bytearray_less_or_equal(bytearray1, bytearray2): | |
""" | |
In LevelDB, you can specify a custom comparator. | |
It seems Spotify's comparator behaves a bit like this. | |
""" | |
# Compare byte by byte | |
group_separator = 0x1D | |
for byte1, byte2 in zip(bytearray1, bytearray2): | |
if byte1 == group_separator and byte2 != group_separator: | |
return False | |
if byte1 != group_separator and byte2 == group_separator: | |
return True | |
if byte1 < byte2: | |
return True | |
elif byte1 > byte2: | |
return False | |
# If all bytes are equal, check the length | |
return len(bytearray1) <= len(bytearray2) | |
def read_footer(file): | |
file.seek(-48, 2) | |
last_48_bytes = file.read() | |
integers, _ = read_n_varints(last_48_bytes[:40], 0, 4) | |
metaindex_handle = BlockHandle(*integers[:2]) | |
index_handle = BlockHandle(*integers[2:]) | |
return metaindex_handle, index_handle | |
def read_int(data, offset=0, length=None): | |
if length is None: | |
length = len(data) | |
stop_offset = offset + length | |
value = int.from_bytes(data[offset:stop_offset], byteorder="little") | |
return value, stop_offset | |
def read_n_bytes(data, offset, n): | |
return data[offset : offset + n], offset + n | |
def read_varint(data, offset): | |
value = 0 | |
shift = 0 | |
while True: | |
byte = data[offset] | |
value |= (byte & 0x7F) << shift | |
offset += 1 | |
if not (byte & 0x80): | |
break | |
shift += 7 | |
return value, offset | |
def read_n_varints(data, offset, n): | |
result = [] | |
for _ in range(n): | |
value, offset = read_varint(data, offset) | |
result.append(value) | |
return result, offset | |
def modified(filepath): | |
try: | |
return os.path.getmtime(filepath) | |
except FileNotFoundError: | |
return float("Inf") | |
# -------------------------------------------------- | |
# Command line setup | |
# -------------------------------------------------- | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=("Get your Spotify folder hierarchy with playlists into JSON."), | |
add_help=False, | |
) | |
parser.add_argument( | |
"folder", | |
default=None, | |
metavar="folder", | |
nargs="?", | |
help=( | |
"Get only a specific Spotify folder. If omitted, returns entire " | |
"hierarchy. A folder is specified by its URL or URI. " | |
"Obtain this by dragging a folder into a Terminal window. " | |
"Alternatively, click on a folder in Spotify and do Cmd+C." | |
), | |
) | |
parser.add_argument( | |
"-i", | |
"--info", | |
action="store_const", | |
const=True, | |
default=False, | |
help="Information about Spotify folders on this machine.", | |
) | |
parser.add_argument( | |
"-a", | |
"--account", | |
dest="account", | |
default="1", | |
help=( | |
"Sometimes a machine has multiple Spotify accounts. This gets a " | |
"Spotify folder hierachy of a specific account. 1 is the first, " | |
"2 is the second, etc. To see how many accounts there are, " | |
"use the `-i` flag." | |
), | |
) | |
parser.add_argument( | |
"--cache", | |
dest="cache_dir", | |
default=PERSISTENT_CACHE_PATH, | |
help="Specify a custom PersistentCache directory to look for data in.", | |
) | |
parser.add_argument( | |
"-h", | |
"--help", | |
action="help", | |
default=argparse.SUPPRESS, | |
help="Show this help message and exit.", | |
) | |
args = parser.parse_args() | |
# cache_files = get_all_persistent_cache_files(args.cache_dir) | |
if args.info: | |
print_info_text(len(cache_files)) | |
else: | |
# if not args.account.isdigit() or int(args.account) == 0: | |
# print("Specify account as a positive number. See `--help`.") | |
# sys.exit(2) | |
# cache_file_index = int(args.account) - 1 | |
# if cache_file_index >= len(cache_files): | |
# print( | |
# "No data found in Spotify cache. If you have a custom cache " | |
# "directory set, specify its path with the `--cache` flag. " | |
# "Also, in the Spotify app, check " | |
# "Settings -> Offline storage location." | |
# ) | |
# sys.exit(2) | |
# cache_file_name = cache_files[cache_file_index] | |
# print(_process(cache_file_name, args)) | |
first_value = next( | |
read_leveldb_key_in_directory(b"!pl#slc#\x1dspotify:user:", args.cache_dir), | |
None, | |
) | |
# from filetools import save_bytes | |
print(_process(first_value, args)) | |
# save_bytes(first_value, "/Users/mike/tmp/sandbox/spotify/example.bin") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment