Skip to content

Instantly share code, notes, and snippets.

@mikez
Created December 3, 2023 14:41
Show Gist options
  • Save mikez/eca572f1b6d8d658e0b33d2001d23a9b to your computer and use it in GitHub Desktop.
Save mikez/eca572f1b6d8d658e0b33d2001d23a9b to your computer and use it in GitHub Desktop.
Experimental Spotify folders with LevelDB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Get your Spotify folder hierarchy with playlists into JSON.
:license: MIT, see LICENSE for more details.
"""
from __future__ import print_function
import argparse
import json
import os
import re
import sys
try:
from urllib import unquote_plus # Python 2
except ImportError:
from urllib.parse import unquote_plus # Python 3
if sys.platform == "darwin":
# Mac
PERSISTENT_CACHE_PATH = (
"~/Library/Application Support/Spotify/PersistentCache/Users"
)
elif sys.platform == "win32":
# Windows, via Microsoft store or standalone
windows_appdata_path = os.getenv("LOCALAPPDATA")
windows_store_path = os.path.join(
windows_appdata_path,
"Packages\\SpotifyAB.SpotifyMusic_zpdnekdrzrea0\\LocalState" "\\Spotify\\Users",
)
if os.path.exists(windows_store_path):
PERSISTENT_CACHE_PATH = windows_store_path
else:
PERSISTENT_CACHE_PATH = os.path.join(windows_appdata_path, "Spotify\\Users")
else:
# Linux
PERSISTENT_CACHE_PATH = os.path.join(
os.getenv("XDG_CACHE_HOME", "~/.cache"), "spotify/Users"
)
def parse(data, user_id):
"""
Parse a Spotify PersistentStorage file with folder structure at start.
`file_name`
Location of a PersistentStorage file.
`user_id`
Specify a user id to use for folder URIs. Can also be a
placeholder value like 'unknown'. (Background: this information
doesn't seem to be provided in the source file.)
FILE STRUCTURE
--------------
The file resembles a binary Protocol Buffers file with some twists.
Its current structure seems to be as follows:
1. `00` hexstring.
2. Spotify version number. Encoded as varint.
(E.g. `114800625` is version `1.1.48.625`.)
3. `A40115` hexstring with unknown meaning.
4. Number of {playlist, start-group, end-group} strings.
Encoded as varint of `(number << 3) | 001`.
5. List of any of these three playlist string types:
- playlist identifier
(e.g. "spotify:playlist:37i9dQZF1DXdCsscAsbRNz")
- folder start identifier
(e.g. "spotify:start-group:8212237ac7347bfe:Summer")
- folder end identifier
(e.g. "spotify:end-group:8212237ac7347bfe")
6. Other content we currently ignore.
"""
# with open(file_name, "rb") as data_file:
# data = data_file.read()
# spotify:playlist, spotify:start-group, spotify:end-group
rows = re.split(rb"spotify:(?=[pse])", data)
folder = {"type": "folder", "children": []}
stack = []
for index, row in enumerate(rows):
# Note: '\x10' marks the end of the entire list. This might
# break in future versions of Spotify. Here are two alternative
# solutions one might consider then:
# 1. Read the length encoded as a varint before each string.
# 2. Read the number of repeats specified in the beginning of
# the file.
chunks = row.split(b"\x12", 1)
row = chunks[0]
if row.startswith(b"playlist:"):
folder["children"].append(
{"type": "playlist", "uri": "spotify:" + row.decode("utf-8")}
)
elif row.startswith(b"start-group:"):
stack.append(folder)
tags = row.split(b":")
folder = dict(
# Assuming folder names < 128 characters.
# Alternatively, do a protobuf varint parser to get length.
name=unquote_plus(tags[-1].decode("utf-8")),
type="folder",
uri=(
"spotify:user:%s:folder:" % user_id
+ tags[-2].decode("utf-8").zfill(16)
),
children=[],
)
elif row.startswith(b"end-group:"):
parent = stack.pop()
parent["children"].append(folder)
folder = parent
# if folder.get("children") and len(chunks) > 1:
# break
# close any remaining groups -- sometimes a file contains errors.
while len(stack) > 0:
parent = stack.pop()
parent["children"].append(folder)
folder = parent
return folder
def get_all_persistent_cache_files(path):
"""Get all files in PersistentCache storage with "start-group" marker."""
result = []
path = os.path.expanduser(path)
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if find_in_file(b"start-group", filepath):
result.append(filepath)
return result
def get_folder(folder_id, data):
"""Get a specific folder in data output by `parse()`."""
data_type = data.get("type")
if data_type == "folder":
if data.get("uri", "").endswith(folder_id):
return data
for child in data.get("children", []):
folder = get_folder(folder_id, child)
if folder:
return folder
def find_in_file(string, filepath):
"""Check if a file contains the given string."""
try:
with open(filepath, mode="rb") as f:
for line in f:
if string in line:
return True
except (OSError, IOError):
return False
return False
def print_info_text(number):
"""Prints info text for `number` of PersistentCache storage files."""
suffix = "y" if number == 1 else "ies"
message = "Found {number} folder hierarch{suffix} on this machine.".format(
number=number, suffix=suffix
)
if number > 1:
message += (
"\n\n" "To see the second one, run" "\n\n" " spotifyfolders --account 2\n"
)
print(message)
def _process(raw_data, args, user_id="unknown"):
# preprocessing
if args.folder:
uri = args.folder
if "/" not in uri and ":" not in uri:
print("Specify folder as a URL or Spotify URI. See `--help`.")
sys.exit(2)
separator = "/" if uri.find("/") > 0 else ":"
user_id = uri.split(separator)[-3]
folder_id = uri.split(separator)[-1]
data = parse(raw_data, user_id=user_id)
# postprocessing
if args.folder:
data = get_folder(folder_id, data)
if not data:
print("Folder not found :(")
sys.exit(1)
return json.dumps(data)
# --------------------------------------------------
# Bare-bones LevelDB parser
# see: https://github.com/google/leveldb/
# --------------------------------------------------
from dataclasses import dataclass
import snappy
BLOCK_TRAILER_SIZE = 5 # kBlockTrailerSize
def read_leveldb_key_in_directory(target_key_prefix, path):
# target_key_prefix = b"!pl#slc#\x1dspotify:user:"
path = os.path.expanduser(path)
for dirpath, dirnames, filenames in os.walk(path):
filepaths = [os.path.join(dirpath, filename) for filename in filenames]
for filepath in sorted(filepaths, key=modified, reverse=True):
try:
if filepath.endswith(".ldb"):
yield from read_leveldb_key_in_ldb_file(target_key_prefix, filepath)
elif filepath.endswith(".log"):
values = list(
read_leveldb_key_in_log_file(target_key_prefix, filepath)
)
if values:
# get the last update
yield values[-1]
else:
continue
except (ValueError, OSError, IOError):
continue
def read_leveldb_key_in_log_file(target_key_prefix, filepath):
# print("LOG")
# !pl#slc#\x1dspotify:user:XXXXX:rootlist#
with open(filepath, "rb") as file:
offset = 0
while True:
crc = file.read(4)
if not crc:
return
length, _ = read_int(file.read(2))
block_type_raw = file.read(1)[0]
try:
block_type = {1: "full", 2: "first", 3: "middle", 4: "last"}[
block_type_raw
]
except KeyError:
return
offset += 4 + 2 + 1
file.seek(offset)
if block_type == "first" or block_type == "full":
data = file.read(length)
elif block_type == "middle" or block_type == "last":
data += file.read(length)
if block_type in ("full", "last"):
for operation, args in Batch(data):
if not operation == "put":
continue
key, value = args
if key.startswith(target_key_prefix):
# print("FOUND", filepath)
yield value
offset += length
file.seek(offset)
raise ValueError("Key not found.")
def read_leveldb_key_in_ldb_file(target_key_prefix, filepath):
# print("LDB")
# !pl#slc#\x1dspotify:user:XXXXX:rootlist#
with open(filepath, "rb") as file:
# look in footer
metaindex_handle, index_handle = read_footer(file)
# look in index
for key, data_handle in Block(file, index_handle, values_are_handles=True):
if bytearray_less_or_equal(target_key_prefix, key):
break
else:
raise ValueError("Key not found.")
# look in data block
for key, value in Block(file, data_handle):
if key.startswith(target_key_prefix):
# print("FOUND", filepath)
yield value
return
else:
raise ValueError("Key not found.")
class Batch:
"""
https://github.com/google/leveldb/blob/main/db/write_batch.cc
WriteBatch::rep_ :=
sequence: fixed64
count: fixed32
data: record[count]
record :=
kTypeValue varstring varstring |
kTypeDeletion varstring
varstring :=
len: varint32
data: uint8[len]
"""
def __init__(self, data):
self.data = data
self.sequence, offset = read_int(data, offset=0, length=8)
self.count, self.start_offset = read_int(data, offset, length=4)
def __iter__(self):
data = self.data
offset = self.start_offset
for i in range(self.count):
operation, offset = read_int(data, offset, length=1)
length, offset = read_varint(data, offset)
key, offset = read_n_bytes(data, offset, length)
if operation == 1: # operation == value
length, offset = read_varint(data, offset)
value, offset = read_n_bytes(data, offset, length)
yield "put", (key, value)
else:
yield "delete", (key,)
@dataclass
class BlockHandle:
offset: int
size: int
class Block:
def __init__(self, file, handle: BlockHandle, values_are_handles=False):
self.handle = handle
file.seek(handle.offset)
self.raw_data = file.read(handle.size)
self.compression = file.read(1)
self.crc = file.read(BLOCK_TRAILER_SIZE - 1)
self.values_are_handles = values_are_handles
match self.compression:
case b"\x00":
self.data = self.raw_data
case b"\x01":
self.data = snappy.uncompress(self.raw_data)
case b"\x02":
raise ValueError("Zstandard decompression not implemented")
case _:
raise ValueError("Unknown compression {self.compression!r}")
def __iter__(self):
data = self.data
# restart points
restarts_count = int.from_bytes(data[-4:], byteorder="little")
restarts_suffix_length = restarts_count * 4 + 4
# keys and values
offset = 0
index = 0
shared_key_prefix = b""
while offset < len(data) - restarts_suffix_length:
index += 1
(shared, unshared, value_length), offset = read_n_varints(data, offset, 3)
# key
fullkey = data[offset : offset + unshared]
key = shared_key_prefix[:shared] + fullkey[:-8]
sequence_number = int.from_bytes(fullkey[-8:], byteorder="little")
operation_type = sequence_number & 0xFF
sequence_number >>= 8
offset += unshared
# value
valuedata = data[offset : offset + value_length]
if self.values_are_handles:
(block_offset, block_size), _ = read_n_varints(valuedata, 0, 2)
yield key, BlockHandle(block_offset, block_size)
else:
yield key, valuedata
# prepare for next round
if shared == 0:
shared_key_prefix = key
offset += value_length
def bytearray_less_or_equal(bytearray1, bytearray2):
"""
In LevelDB, you can specify a custom comparator.
It seems Spotify's comparator behaves a bit like this.
"""
# Compare byte by byte
group_separator = 0x1D
for byte1, byte2 in zip(bytearray1, bytearray2):
if byte1 == group_separator and byte2 != group_separator:
return False
if byte1 != group_separator and byte2 == group_separator:
return True
if byte1 < byte2:
return True
elif byte1 > byte2:
return False
# If all bytes are equal, check the length
return len(bytearray1) <= len(bytearray2)
def read_footer(file):
file.seek(-48, 2)
last_48_bytes = file.read()
integers, _ = read_n_varints(last_48_bytes[:40], 0, 4)
metaindex_handle = BlockHandle(*integers[:2])
index_handle = BlockHandle(*integers[2:])
return metaindex_handle, index_handle
def read_int(data, offset=0, length=None):
if length is None:
length = len(data)
stop_offset = offset + length
value = int.from_bytes(data[offset:stop_offset], byteorder="little")
return value, stop_offset
def read_n_bytes(data, offset, n):
return data[offset : offset + n], offset + n
def read_varint(data, offset):
value = 0
shift = 0
while True:
byte = data[offset]
value |= (byte & 0x7F) << shift
offset += 1
if not (byte & 0x80):
break
shift += 7
return value, offset
def read_n_varints(data, offset, n):
result = []
for _ in range(n):
value, offset = read_varint(data, offset)
result.append(value)
return result, offset
def modified(filepath):
try:
return os.path.getmtime(filepath)
except FileNotFoundError:
return float("Inf")
# --------------------------------------------------
# Command line setup
# --------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Get your Spotify folder hierarchy with playlists into JSON."),
add_help=False,
)
parser.add_argument(
"folder",
default=None,
metavar="folder",
nargs="?",
help=(
"Get only a specific Spotify folder. If omitted, returns entire "
"hierarchy. A folder is specified by its URL or URI. "
"Obtain this by dragging a folder into a Terminal window. "
"Alternatively, click on a folder in Spotify and do Cmd+C."
),
)
parser.add_argument(
"-i",
"--info",
action="store_const",
const=True,
default=False,
help="Information about Spotify folders on this machine.",
)
parser.add_argument(
"-a",
"--account",
dest="account",
default="1",
help=(
"Sometimes a machine has multiple Spotify accounts. This gets a "
"Spotify folder hierachy of a specific account. 1 is the first, "
"2 is the second, etc. To see how many accounts there are, "
"use the `-i` flag."
),
)
parser.add_argument(
"--cache",
dest="cache_dir",
default=PERSISTENT_CACHE_PATH,
help="Specify a custom PersistentCache directory to look for data in.",
)
parser.add_argument(
"-h",
"--help",
action="help",
default=argparse.SUPPRESS,
help="Show this help message and exit.",
)
args = parser.parse_args()
# cache_files = get_all_persistent_cache_files(args.cache_dir)
if args.info:
print_info_text(len(cache_files))
else:
# if not args.account.isdigit() or int(args.account) == 0:
# print("Specify account as a positive number. See `--help`.")
# sys.exit(2)
# cache_file_index = int(args.account) - 1
# if cache_file_index >= len(cache_files):
# print(
# "No data found in Spotify cache. If you have a custom cache "
# "directory set, specify its path with the `--cache` flag. "
# "Also, in the Spotify app, check "
# "Settings -> Offline storage location."
# )
# sys.exit(2)
# cache_file_name = cache_files[cache_file_index]
# print(_process(cache_file_name, args))
first_value = next(
read_leveldb_key_in_directory(b"!pl#slc#\x1dspotify:user:", args.cache_dir),
None,
)
# from filetools import save_bytes
print(_process(first_value, args))
# save_bytes(first_value, "/Users/mike/tmp/sandbox/spotify/example.bin")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment