Skip to content

Instantly share code, notes, and snippets.

@KokoseiJ
Last active March 1, 2023 07:22
Show Gist options
  • Save KokoseiJ/34bdaa6e90a52f99b7aadf499e7a7ccd to your computer and use it in GitHub Desktop.
Save KokoseiJ/34bdaa6e90a52f99b7aadf499e7a7ccd to your computer and use it in GitHub Desktop.
Stepmania MemoryCard mount analysis

meow

  • MemoryCardManager.cpp/ThreadedMemoryCardWorker::DoHeartbeat

  • MemoryCardDriver::DoOneUpdate

  • MemoryCardDriverThreaded_Linux::GetUSBStorageDevices

  • ThreadedMemoryCardWorker inherits RageWorkerThread, which seems to be timeout-able thread worker

    • It runs heartbeat every 0.1 seconds it seems?
    • It waits 0.1 seconds from when the last heartbeat function has finished
    • Running heartbeat means running DoHeartbeat method
  1. In DoHeartbeat, it runs DoOneUpdate with fresh aStorageDevices vector
  2. In DoOneUpdate, it checks NeedUpdate to determine if testing is needed
    1. NeedUpdate checks either one of the devices have a state of CHEKING. if not, it runs USBStorageDevicesChanged
    2. USBStorageDevicesChanged iterates through every device in /sys/block, runs stat() on it to check if it's valid, and appends its inode in $i, format to a string. It then compares that string with previous devices to detect device changes in /sys/block.
  3. Then DoOneUpdate finally runs GetUSBStorageDevices, with a vector initialized in #1 as arg
    1. It iterates through the devices available in /sys/block/, where it has /removable value set to 0 it skips the device, filtering non-USB devices
    2. it checks /queue sysfs directory, which dictates that the partition table has been scanned. WE THINK THIS MIGHT BE FALSE!
      1. It repeats the check every 0.01 seconds for 5 seconds. After that, it throws timeout and leaves the loop.
      2. If sysfs folder disappears, it means the device is gone before the scan was done. Throws warning and leaves the loop.
      3. If the queue folder is accessible, the scan must've been done! Hooray! Leaves the loop like a champ.
    3. It runs udevadm settle to wait for device node creation to complete.
    4. Then it determines if /sys/{dev}/{dev}1 folder (first partition) is a thing. If so, they set the device mountpoint to /dev/{dev}1, else /dev/{dev}. Nice job writing a log that is not ambiguous at all, 2005 Stepmania team... This piece of code is almost older than me and it shows.
    5. Then it reads information like bus/port/level, vender and stuff... We don't need to know about this, just check that everything looks as it should be.
    6. then these guys open up the fstab, see if there's a mountpoint corresponding to the device. by-path are just symlinks to their /dev/{device} mountpoints, so if there's a match they should know the OS mountpath for it.
    7. They check and weed out the devices that stepmania doesn't know its OsMountDir. Then it's done!
  4. Then it iterates through the devices freshly received from the function above. Judging by the behaviour of Stepmania, anything from this line is unrelated to the actual issue.
  5. If the device is not in the known list of devices, set its state to CHECKING so that the main thread is aware about it? Idk what this part is for
  6. If we've seen it already (state set to CHECKING) and the method was invoked with bMount==True which allows mounting, try mounting. if fails, throw error and continue iterating.
  7. Check if the device is writeable, read the profile name then unmount it.
import os
import re
import json
import time
import traceback
SYSBLOCK = "/sys/block"
lastdevices = ""
def log(decorator, *args, **kwargs):
if args[0].startswith("\n"):
decorator = f"\n{decorator}"
args = list(args)
args[0] = args[0].lstrip()
return print(decorator, f"[{time.perf_counter():.6f}]", *args, **kwargs)
def info(*args, **kwargs):
return log("[*]", *args, **kwargs)
def warn(*args, **kwargs):
return log("[!]", *args, **kwargs)
def usb_storage_device_changed():
global lastdevices
info("\nEntered usb_storage_device_changed")
devices = os.listdir(SYSBLOCK)
info("detected devices:", devices)
thisdevices = ""
for device in devices:
try:
devpath = f"{SYSBLOCK}/{device}"
stat_buffer = os.stat(devpath)
except Exception:
warn(f"Error while running stat() syscall to {devpath}.")
traceback.print_exc()
continue
thisdevices += f"{stat_buffer.st_ino},"
changed = thisdevices != lastdevices
info("thisdevices:", thisdevices)
info("lastdevices:", lastdevices)
if changed:
info("Change in USB storage devices detected.")
lastdevices = thisdevices
info("Finished usb_storage_device_changed\n")
return changed
def get_usb_storage_devices():
info("\nEntered get_usb_storage_devices")
usbs = []
for device in os.listdir(SYSBLOCK):
syspath = f"{SYSBLOCK}/{device}"
info(f"\nChecking {syspath}:", os.listdir(syspath))
removable_path = f"{syspath}/removable"
if os.path.exists(removable_path):
with open(removable_path) as f:
if int(f.read()) != 1:
warn(f"Device {syspath} isn't removable, skipping")
continue
else:
warn(f"Device {syspath} doesn't have removable file??? skipping")
queue_path = f"{syspath}/queue"
info(f"Start waiting for {queue_path} to come alive")
start_time = time.perf_counter()
while True:
now = time.perf_counter()
print(f"\r{now - start_time}")
if now - start_time > 5:
warn(f"Timed out waiting for {queue_path}")
break
if not os.path.exists(syspath):
warn(f"{syspath} disappeared lmao")
break
if os.path.exists(queue_path):
info(f"{queue_path} now exists!")
break
time.sleep(0.01)
info("Running `udevadm settle`...")
info("success" if os.system("udevadm settle") == 0 else "failed?")
info(f"{syspath}:", os.listdir(syspath))
info("/dev/disk/by-path:", os.listdir("/dev/disk/by-path"))
part_path = f"{syspath}/{device}1"
info(f"Checking if {part_path} is a thing...")
if os.path.exists(part_path):
info(f"Partition exists! using /dev/{device}1")
devpath = f"/dev/{device}1"
else:
warn(f"Partition does not exist. using /dev/{device}")
devpath = f"/dev/{device}"
device_link = f"{syspath}/device"
info(f"Canonical path for {device_link}:", os.path.realpath(device_link))
usbs.append({
"syspath": syspath,
"device": devpath,
})
info(json.dumps(usbs))
with open("/etc/fstab") as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
scsi_device, mount_point = re.sub(r"\s", " ", line).split()[:2]
info(f"{scsi_device} {mount_point}")
underlying_device = os.path.realpath(scsi_device)
if not os.path.exists(underlying_device):
warn(f"{scsi_device} doesn't exist, skipping")
continue
for usb in usbs:
if usb['device'] == underlying_device:
print(f"Found a match! {usb['device']} : {mount_point}")
usb['mountdir'] = mount_point
break
usbs_matchonly = [x for x in usbs if x.get("mountdir")]
info(json.dumps(usbs))
info("Finished get_usb_storage_devices\n")
return usbs
def main():
while True:
if usb_storage_device_changed():
get_usb_storage_devices()
time.sleep(0.1)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment