Skip to content

Instantly share code, notes, and snippets.

@Gemba
Created August 31, 2022 15:30
Show Gist options
  • Save Gemba/52c7fbf1b2c3be26f324eca09d7acaf4 to your computer and use it in GitHub Desktop.
Save Gemba/52c7fbf1b2c3be26f324eca09d7acaf4 to your computer and use it in GitHub Desktop.
Creates a custom collection of recently added games for RetroPie/EmulationStation.
#! /usr/bin/env python3
# Copyright (C) 2022 Gemba, https://github.com/Gemba
# Contains portions of crtime. Copyright (C) 2019 Pascal van Kooten,
# https://github.com/kootenpv/crtime
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# This script creates a static collection for EmulationStation with the games
# added during a selectable period.
#
# Setup
# -----
# 1. sudo apt install python3-pandas python3-lxml
# 2. Review/adjust ALL_ROMS_CSV, DEVICE, ROMS_HOME, ES_SETTINGS. Details next
# 3. Have a recent CSV with your ROMs and adjust path to it (see ALL_ROMS_CSV
# below), or create one with the --csv option
# 4. Linux only: Identify the partition where your ROMs are stored and adjust
# the variable DEVICE (see below)
# 5. If not on a RetroPie/Linux adjust ROMS_HOME and ES_SETTINGS (see below).
# The latter ist only needed for the --csv option
#
# Running
# -------
# 1. See --help output of this script, run with sudo, as debugfs needs elevated
# privileges.
# 2. When satisfied, copy the generated file to ~/emulationstation/collections
# 3. Restart ES and enable the collection in the EmulationStation custom
# collection settings
#
# Limitations
# -----------
# - As the Rpi has no real-time clock the results are limited to how accurate
# you set the clock on bootup of the Rpi.
# - The result is a static list, thus rerun it whenever you added roms to your
# setup and have the CSV updated.
# - The collection in ES is sorted alphabetical by default, as no "creation
# time" is stored in the gamelist files.
# - When using --dry-run the matches contain all multidisc entries.
# - The --csv creation may include files which are not present in a folders
# gamelist.xml. Also the --csv does not recognize ROMs in subfolders.
import argparse
import csv
import os
import pandas as pd
import platform
import re
import subprocess
import sys
import tempfile
import time
from datetime import datetime
from lxml import etree
from pathlib import Path
# CSV file to ingest, adjust file location to needs.
# CSV must contain atleast columns "Filename" and "Folder".
# Filename is the ROM file (no path information)
# Folder is the platform (amiga, megadrive, snes, ...)
# File can also created with --csv option on a RetroPie/Rpi/ES setup.
ALL_ROMS_CSV = "/home/pi/all_roms.csv"
# partition of ROMS_HOME (see below)
# Linux only, check with output of 'mount'
# When run from an SDcard: DEVICE = "/dev/mmcblk0p2"
DEVICE = "/dev/sda1"
# Adjust to needs if _not_ on a Rpi/Linux
ROMS_HOME = Path("/home/pi/RetroPie/roms")
# Only needs to be accurate for ---csv option, leave /etc as failsafe as last
# entry. Where to find emulationstation/es_settings.cfg
ES_SETTINGS = [
"/opt/retropie/configs/all",
"/etc"
]
# Will be suffixed with "recently" or "since <YYYY-MM-DD>"
COLLECTION_PREFIX = "Added "
# Trivial RE to filter out multidisc games from disc 2 onwards
MULTIDISK = re.compile(r".+Dis[c|k]\s[2-9].+")
def init_cli_parser():
"""Initialize the argument parser."""
def check_positive(value):
intv = None
try:
intv = int(value)
except:
pass
if not intv or intv <= 0:
raise argparse.ArgumentTypeError(
f"[!] Got {value}. Should only be int greater than 0.")
return intv
def check_date(s):
try:
return datetime.strptime(s, "%Y-%m-%d")
except ValueError:
raise argparse.ArgumentTypeError(
f"[!] Got {s}. Should only be ISO-8601 date string.")
parser = argparse.ArgumentParser(
description='Creates a custom collection with recently added ROMs '
'by utilizing a ROMs creation time on filesystem.')
group = parser.add_mutually_exclusive_group()
group.add_argument('months_ago', nargs='?',
type=check_positive, default=0,
help='Include ROMs months ago most recent ROM-file '
'datetime or now (see --now). Either use this months '
'period or provide start date with --after')
group.add_argument('-a', '--after', dest='after', nargs=1,
type=check_date,
help='Use provided date (YYYY-MM-DD) after which ROMs '
'are included instead of months period')
parser.add_argument("-n", "--now",
help='If true use current datetime instead of most '
'recent ROM-file datetime',
action='store_true', default=False)
parser.add_argument('-l', '--limit', dest='limit', nargs=1,
type=check_positive,
help='Limit number of included ROMs of period. Counted '
'from most recently added descending')
parser.add_argument("-c", "--csv", help=f'Create a CSV of all ROMs in '
f'{ALL_ROMS_CSV}', action='store_true', default=False)
parser.add_argument("-v", "--verbose",
help='Be more chatty',
action='store_true', default=False)
parser.add_argument("-d", "--dry-run", dest='dry_run',
help='Do not write collection file',
action='store_true', default=False)
return parser
def parse_output(output, as_epoch):
"""Parses the command line output of debugfs."""
fname = None
results = {}
for line in output.split("\n"):
if line.startswith("debugfs: stat"):
fname = line[14:]
elif line.startswith("crtime:"):
crtime = line.split("-- ")[1]
if as_epoch:
crtime = int(time.mktime(time.strptime(crtime)))
results[fname.strip('"')] = crtime
return results
def get_crtimes(fnames, raise_on_error=True, as_epoch=False):
"""Identifies the creation dates of files."""
if system == "Windows":
return [(fname, os.stat(fname).st_ctime) for fname in fnames]
elif system != "Linux":
return [(fname, os.stat(fname).st_birthtime) for fname in fnames]
with tempfile.NamedTemporaryFile() as f:
f.write(("\n".join('stat "{}"'.format(x)
for x in fnames) + "\n").encode())
f.flush()
cmd = ["debugfs", "-f", f.name, DEVICE]
with open(os.devnull, "w") as devnull:
output = subprocess.check_output(cmd, stderr=devnull)
results = parse_output(output.decode("utf8"), as_epoch)
if raise_on_error:
for fname in fnames:
if fname in results:
continue
raise ValueError(f'filename "{fname}" does not have a crtime')
return [results.get(fname) for fname in fnames]
def to_absolute_filename(fn, folder):
abs_rom_fn = ROMS_HOME / folder / fn
return str(abs_rom_fn)
def print_matches(crtime, absfile):
print(f" {crtime.strftime('%Y-%m-%d %H:%M')}: "
f"{Path(absfile).relative_to(ROMS_HOME)}")
def write_collection(df):
"""Writes custom collection file."""
of = f"custom-{COLLECTION_PREFIX}{collection_suffix}.cfg"
ctr = 0
with open(of, 'w') as fh:
for k in df.iterrows():
fn = k[1][0]
if MULTIDISK.match(fn):
print(f"[!] Skipping multidisc entry "
f"{Path(fn).relative_to(ROMS_HOME)}")
continue
ctr += 1
fh.write(f"{fn}\n")
if args.verbose:
print(f" {k[0].strftime('%Y-%m-%d %H:%M')}: "
f"{Path(fn).relative_to(ROMS_HOME)}")
if args.limit and ctr == args.limit[0]:
break
print(f"[+] Written {ctr} entr{'y' if ctr == 1 else 'ies'} to '{of}'.\n"
" Do copy to ~/.emulationstation/collections.")
def get_settings_cfg():
"""Locates a es_settings.cfg file."""
es_cfg_file = None
es_config_path = "emulationstation/es_systems.cfg"
for e in [ep for ep in [Path(es).joinpath(es_config_path)
for es in ES_SETTINGS] if ep.is_file()]:
es_cfg_file = e
# no pathlib.Path.is_relative_to() in Rpi/Buster
if str(e).startswith(ES_SETTINGS[-1]):
print(f"[!] Using generic settings from {e}.")
break
if not es_cfg_file:
print("[!] No es_systems.cfg file found on this system. I quit.")
sys.exit(1)
return es_cfg_file
def read_rom_extensions(es_cfg_file):
"""Read extensions from emulationstation es_systems.cfg."""
rom_extensions = {}
with open(es_cfg_file) as fh:
xml = etree.parse(fh)
for system_xml in xml.findall('system'):
folder = [sn.text for sn in system_xml.iterchildren()
if sn.tag == 'name'][0]
if folder in ["retropie", "kodi"]:
continue
ext_strings = system_xml.find('extension').text
exts = set(ext_strings.split())
rom_extensions[folder] = sorted(list(exts), key=str)
return rom_extensions
def create_csv():
"""Writes a CSV with Folder and Filename column."""
rom_extensions = read_rom_extensions(get_settings_cfg())
print(f"[*] Creating '{ALL_ROMS_CSV}' ...")
with open(ALL_ROMS_CSV, 'w') as csvfile:
wr = csv.writer(csvfile, quoting=csv.QUOTE_ALL, quotechar='"')
wr.writerow(('Folder', 'Filename'))
for folder, exts in rom_extensions.items():
roms_in_folder = [r for roms in [(ROMS_HOME / folder).glob(f"*{e}")
for e in exts] for r in roms]
k = len(roms_in_folder)
for rf in roms_in_folder:
wr.writerow((folder, f"{rf}"))
if args.verbose and k:
print(f" {folder:14s}: {k:4d} ROM{'' if k == 1 else 's'}")
print("[+] ... done")
if __name__ == "__main__":
parser = init_cli_parser()
args = parser.parse_args(args=None if sys.argv[1:] else ['-h'])
system = platform.system()
if system == "Linux" and not os.environ.get("SUDO_USER", False):
raise ValueError("[!] Should be run as sudo user on linux.")
if args.csv:
create_csv()
df = pd.read_csv(ALL_ROMS_CSV)
fn_folder = zip(df['Filename'], df['Folder'])
df['absfile'] = [to_absolute_filename(
fn, folder) for fn, folder in fn_folder]
df['crtime'] = get_crtimes(df['absfile'].values)
df = df[['absfile', 'crtime']]
df['crtime'] = pd.to_datetime(df['crtime'], infer_datetime_format=True)
if args.now:
before = pd.to_datetime('today')
else:
before = df['crtime'].max()
if args.months_ago:
after = before + pd.DateOffset(months=-args.months_ago)
collection_suffix = "recently"
elif args.after:
after = pd.to_datetime(args.after[0])
collection_suffix = f"since {after.strftime('%Y-%m-%d')}"
else:
print(f"[!] Provide either months or --after parameter.")
sys.exit(1)
print(f"[*] Including from {after.strftime('%Y-%m-%d %H:%M')}")
print(f" to {before.strftime('%Y-%m-%d %H:%M')}")
df = df.set_index(['crtime'])
# sorted from least recent to most recent
df = df.sort_index(level=2, sort_remaining=True)
before_idx = df.index.searchsorted(before)
after_idx = df.index.searchsorted(after)
df = df.iloc[after_idx:before_idx + 1]
# sort descending to cut off right set when limit is set, i.e.
# sorted from most recent to least recent
df = df.sort_index(level=2, sort_remaining=True, ascending=False)
if args.dry_run:
print(f"[+] {df.shape[0]} ROM{'' if df.shape[0] ==1 else 's'} matched.")
if args.verbose:
[print_matches(crtime, absfile)
for crtime, absfile in zip(df.index, df['absfile'])]
print("[*] No file written.")
else:
if df.shape[0]:
write_collection(df)
else:
print("[-] No matches. No file written.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment