Skip to content

Instantly share code, notes, and snippets.

@AstraLuma
Created April 30, 2020 19:09
Show Gist options
  • Save AstraLuma/638d130119a7ee28f1050d396d0bd339 to your computer and use it in GitHub Desktop.
Save AstraLuma/638d130119a7ee28f1050d396d0bd339 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Launches and manages uMTP-Responder with better cleanup.
"""
import contextlib
import os
from pathlib import Path
import subprocess
import tempfile
import time
@contextlib.contextmanager
def mount(src, dst, *params):
"""
Mounts and unmounts a thing
"""
print(f"mount {dst}")
subprocess.run(['mount', src, dst, *params], check=True)
try:
yield Path(dst)
finally:
print(f"umount {dst}")
subprocess.run(['umount', dst], check=True)
# NOTE: these operations are specifically tailored for working under ConfigFS,
# because it alters some of the fine details that affect algorithms for cleaning
# up
@contextlib.contextmanager
def mkdir(dirpath, **params):
"""
Makes and cleans up a directory.
"""
print(f"mkdir {dirpath}")
dirpath.mkdir(**params)
try:
yield dirpath
finally:
print(f"rmdir {dirpath}")
dirpath.rmdir()
@contextlib.contextmanager
def symlink(src, dst):
"""
Makes (and cleans up) a symlink at dst pointing to src
"""
print(f"ln {dst} {src}")
os.symlink(src, dst)
try:
yield dst
finally:
print(f"unlink {dst}")
dst.unlink()
@contextlib.contextmanager
def popen(cmd, **params):
daemon = subprocess.Popen(cmd, **params)
try:
yield daemon
finally:
daemon.terminate()
subprocess.run(['modprobe', 'libcomposite'], check=True)
with contextlib.ExitStack() as context_stack:
ctx = context_stack.enter_context
working = Path(ctx(tempfile.TemporaryDirectory()))
cfg = ctx(mkdir(working / 'cfg'))
# NOTE: on raspbian this is already mounted at /sys/kernel/config
# but idk how common this is
ctx(mount('none', cfg, '-t', 'configfs'))
# Has usb_gadget
g1 = ctx(mkdir(cfg / 'usb_gadget' / 'g1'))
# Pre-populated with:
# * ./g1/os_desc
# * ./g1/os_desc/qw_sign
# * ./g1/os_desc/b_vendor_code
# * ./g1/os_desc/use
# * ./g1/strings
# * ./g1/configs
# * ./g1/functions
# * ./g1/UDC
# * ./g1/bcdUSB
# * ./g1/bcdDevice
# * ./g1/idProduct
# * ./g1/idVendor
# * ./g1/bMaxPacketSize0
# * ./g1/bDeviceProtocol
# * ./g1/bDeviceSubClass
# * ./g1/bDeviceClass
c1 = ctx(mkdir(g1 / 'configs' / 'c.1'))
# Creates:
# * configs/c.1/strings
# * configs/c.1/bmAttributes
# * configs/c.1/MaxPower
ctx(mkdir(g1 / 'functions' / 'ffs.mtp'))
# Creates nothing additional
strings = ctx(mkdir(g1 / 'strings' / '0x409'))
# Creates:
# * strings/0x409/serialnumber
# * strings/0x409/product
# * strings/0x409/manufacturer
confstrs = ctx(mkdir(c1 / 'strings' / '0x409'))
# TODO
(g1 / 'idProduct').write_text('0x0100')
(g1 / 'idVendor').write_text('0x1D6B')
(strings / 'serialnumber').write_text("01234567")
(strings / 'manufacturer').write_text("PursuedPyBear")
(strings / 'product').write_text("GameBear")
(confstrs / 'configuration').write_text('Conf 1\n')
(c1 / 'MaxPower').write_text('120')
ctx(symlink(
g1 / 'functions' / 'ffs.mtp',
c1 / 'ffs.mtp',
))
ffs = ctx(mkdir(Path('/dev/ffs-mtp')))
ctx(mount('mtp', ffs, '-t', 'functionfs'))
print("start umtprd")
daemon = ctx(popen(['umtprd'], cwd=g1))
time.sleep(1)
print("# build udc")
(g1 / 'UDC').write_text('\n'.join(
item.name
for item in Path('/sys/class/udc').iterdir()
))
retcode = daemon.wait()
if retcode:
raise Exception(f"mtprd failed with retcode={retcode}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment