Skip to content

Instantly share code, notes, and snippets.

@ubergeek42
Created February 17, 2024 17:05
Show Gist options
  • Save ubergeek42/8a30000e2bb45d71b96a873f4b7749d6 to your computer and use it in GitHub Desktop.
Save ubergeek42/8a30000e2bb45d71b96a873f4b7749d6 to your computer and use it in GitHub Desktop.
USB Drive Imaging Tools for ICPC Contests

Warning: I've done no testing of this script on anything other than my own machine. Take care when running these scripts, as they can result in wiping the wrong disk if you are not careful. You've been warned.

Imaging Guide

To speed up imaging of disks instead of using dd and copying the whole thing (including blocks that are just full of zeroes), we can use partclone to be more intelligent about things. This is a two step process, first we must prepare the image to something partclone knows how to use, and then we can restore it to our disks.

Prerequisites

apt-get install partclone fdisk mount

Preparing the image

Once you've downloaded a contest image, you must first prepare it. This process runs partclone to create optimized images of the partitions, basically only including the blocks that are used.

python3 prepare_image.py midatlantic-2024-02-16_image-amd64.img

This should take maybe 20 minutes or so and create some new files in the directory.

Imaging a device

Use the included restore_image.sh script to re-assemble these parts back onto a flash drive. Edit line 14 to match the image name you ran the prepare script on, then you can invoke it as something like:

./restore_image.sh /dev/sde # Replacing /dev/sde with your usb flash drive

On my system using usb3 ports and usb3 drive(Sandisk Cruzer Ultra Flair 32GB), this restore process takes a little bit under 15 minutes.

Advanced mode

WARNING Here be dragons, I make no claims that this might work anywhere else/I don't have any other systems to test on.

Because I'm lazy, figuring out which drive id and invoking commands manually is a pain (and I typically have to image ~100+ usb drives), I have some other scripts that use udev to watch for usb hotplug events, and trigger the scripts above automatically.

The basic process is to figure out what your usb ports map to in udev (see detect_ports.py), and then run a script that watches a specific port and runs a command whenever a device is plugged into it.

These python scripts require pyudev, which seems like it's available as apt-get install python3-pyudev, but I can't be certain as I use a virtualenv for it.

detect_ports.py

Invoking this script will watch for all usb hotplug events, and print out some path identifier. So the way to use this is to start the script, then plug a usb drive into each port you want to use, then wait for it to print the identifier out.

For example, the front 4 ports on my desktop output the following:

$ python detect_ports.py 
Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb3/3-1/3-1:1.0/host6/target6:0:0/6:0:0:0/block/sde)
Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb3/3-2/3-2:1.0/host6/target6:0:0/6:0:0:0/block/sde)
Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb2/2-6/2-6:1.0/host6/target6:0:0/6:0:0:0/block/sde)
Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb2/2-5/2-5:1.0/host6/target6:0:0/6:0:0:0/block/sde)

clonedisk.py

Take the identifiers from detect_ports.py, strip off some suffix on it (I'm not 100% sure what the rule for how much to cut off is, but see the script for examples), and plug it into the portmapping dictionary at the top of clonedisk.py.

Then you can invoke clonedisk.py (as root) with an argument like python clonedisk.py Port_2 $PWD/restore_image.sh. Plug a drive into that port, and watch as it kicks off the restore_image script. I usually add some more output to the restore_image script to make it clear when the drive is finished.

With a usb hub, I'm usually able to do about 8 drives at a time, and finish imaging all 100+ drives over the course of a day or so.

#!/usr/bin/env python
import pyudev
import time
import argparse
import threading
import subprocess
from queue import Queue
from datetime import datetime
portmapping = {
# detectports.py output:
# Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb3/3-1/3-1:1.0/host6/target6:0:0/6:0:0:0/block/sde)
# Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb3/3-2/3-2:1.0/host6/target6:0:0/6:0:0:0/block/sde)
# Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb2/2-6/2-6:1.0/host6/target6:0:0/6:0:0:0/block/sde)
# Kingston DataTraveler 3.0 at /dev/sde (port /devices/pci0000:00/0000:00:14.0/usb2/2-5/2-5:1.0/host6/target6:0:0/6:0:0:0/block/sde)
# Trim them down to something like this, they end up used as a prefix-match so don't trim too much.
# Front panel ports, from left to right
'/devices/pci0000:00/0000:00:14.0/usb3/3-1/': 'Port_1',
'/devices/pci0000:00/0000:00:14.0/usb3/3-2/': 'Port_2',
'/devices/pci0000:00/0000:00:14.0/usb2/2-6/2-6': 'Port_3',
'/devices/pci0000:00/0000:00:14.0/usb2/2-5/2-5': 'Port_4',
}
work_queue = Queue()
port = None
script = None
def image_port():
print('Imaging thread started!')
while True:
task = work_queue.get()
prt = task['port']
dev = task['device']
info = task['info']
print(f'Starting imaging {dev} on {prt} ({info})')
start_time = datetime.now()
print(f'Launching script: {script} {dev}')
# print(f'Waiting 10s...')
# time.sleep(10)
status = subprocess.call([script, dev])
if status != 0:
print(f'Imaging process failed with status: {status}')
end_time = datetime.now()
duration = end_time - start_time
hours,remainder = divmod(duration.seconds,3600) # Get Hour
minutes,seconds = divmod(remainder,60) # Get Minute & Second
print(f'Finished imaging {dev} on {prt} in {hours:02}:{minutes:02}:{seconds:02}')
def device_changed_callback(device):
devpath = device.device_path
devtype = device.get('DEVTYPE')
if devtype != 'disk':
return
devname = device.get('DEVNAME') # same as device.device_node?
model = device.get('ID_MODEL_ENC', 'Unknown Model').encode('latin1', errors='backslashescape').decode('unicode_escape')
vendor = device.get('ID_VENDOR_ENC', 'Unknown Vendor').encode('latin1', errors='backslashescape').decode('unicode_escape')
dev_port = None
for port_devpath,port_name in portmapping.items():
if devpath.startswith(port_devpath):
dev_port = port_name
break
# nothing to do if this isn't the port we're monitoring
if dev_port != port:
return
if device.action == 'add':
print(f'{port}: {vendor} {model} at {devname} (port {device.device_path})')
time.sleep(2) # wait 5s for it to settle
work_queue.put({
'port': port,
'device': devname,
'info': f'{vendor} {model}'
})
def launch_threads():
# start a thread that images devices
imagethread = threading.Thread(target=image_port)
imagethread.setDaemon(True)
imagethread.start()
# monitor udev
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='block', device_type='disk')
observer = pyudev.MonitorObserver(monitor, callback=device_changed_callback)
observer.start()
# block waiting for the imagethread to finish, which it never will
imagethread.join()
def main():
global port, script
parser = argparse.ArgumentParser(description='Monitor a usb port via udev and clone images to it')
parser.add_argument('port', type=str, help='Port to monitor (e.g. Port_1')
parser.add_argument('script', type=str, help='Script to run when a device is inserted')
args = parser.parse_args()
port = args.port
script = args.script
print(f'Monitoring port: {port}')
print(f'Will run the following command whenever a device is inserted: {script} /dev/sd?')
launch_threads()
if __name__ == "__main__":
main()
#!/usr/bin/env python
import pyudev
import time
import argparse
import threading
import subprocess
from datetime import datetime
def device_changed_callback(device):
devpath = device.device_path
devtype = device.get('DEVTYPE')
if devtype != 'disk':
return
devname = device.get('DEVNAME') # same as device.device_node?
model = device.get('ID_MODEL_ENC', 'Unknown Model').encode('latin1', errors='backslashescape').decode('unicode_escape')
vendor = device.get('ID_VENDOR_ENC', 'Unknown Vendor').encode('latin1', errors='backslashescape').decode('unicode_escape')
if device.action == 'add':
print(f'{vendor} {model} at {devname} (port {device.device_path})')
def launch_threads():
# monitor udev
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='block', device_type='disk')
observer = pyudev.MonitorObserver(monitor, callback=device_changed_callback)
observer.start()
observer.join()
def main():
launch_threads()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
from subprocess import PIPE, Popen
import json
import sys
from pathlib import Path
import atexit
def run_foreground(cmd):
p = Popen(cmd, shell=True)
p.wait()
if p.returncode != 0:
print(f"Error running {cmd}")
return False
return True
def run_cmd(cmd, ignore_errors=False):
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = p.communicate()
if p.returncode != 0 and not ignore_errors:
print(
f"""Command: {cmd}
Code: {p.returncode}
Out:
{out}
Err:
{err}
"""
)
return (p.returncode, out, err)
def cleanup_loopback(dev):
print(f"detaching loopback device {dev}")
run_cmd(f"losetup --detach {dev}")
image=sys.argv[1]
(ret, out, err) = run_cmd('sfdisk --version')
(ret, out, err) = run_cmd("sfdisk --dump " + image + ' > ' + f"{image}.sfdisk")
(ret, out, err) = run_cmd("sfdisk --dump --json " + image + ' > ' + f"{image}.sfdisk.json")
print(f"creating loopback device for image")
(ret, out, err) = run_cmd("losetup --show --partscan -f " + image)
loopback_dev = out.decode("utf-8").strip()
atexit.register(lambda: cleanup_loopback(loopback_dev))
print(f" got loopback device {loopback_dev}")
sfdisk_contents = Path(f'{image}.sfdisk.json').read_text()
sfdisk = json.loads(sfdisk_contents)
partitions = sfdisk['partitiontable']['partitions']
print(f"Found {len(partitions)} partitions")
# Mapping values
fstypes = {
"21686148-6449-6E6F-744E-656564454649": "dd", # bios boot partition
"C12A7328-F81F-11D2-BA4B-00A0C93EC93B": "partclone.vfat", # esp partition, which is fat
"EBD0A0A2-B9E5-4433-87C0-68B6B72699C7":"detect", # "Basic data partition"
"0FC63DAF-8483-4772-8E79-3D69D8477DE4": "detect", # "linux filesystem partition"
}
commands = [
# First 1MiB has important grub bits for booting
f"dd status=progress if={loopback_dev} of={image}.bootbits bs=512 count=2048",
]
for i,p in enumerate(partitions):
print(f"Partition {i} is of type {p['type']}")
clone_program = fstypes[p['type']]
if fstypes[p['type']] == "detect":
(ret, out, err) = run_cmd(f"blkid -o value -s TYPE {loopback_dev}p{i+1}")
fstype = out.decode("utf-8").strip()
if fstype == "vfat":
clone_program = "partclone.vfat"
elif fstype == "ext4":
clone_program = "partclone.extfs"
else:
print(f"Unhandled fstype: {fstype}")
run_cmd(f"losetup --detach {loopback_dev}")
exit(1)
n = i+1
if clone_program == "dd":
commands.append(f"dd status=progress if={loopback_dev}p{n} of={image}.p{n}.dd")
else:
# --direct-io if partclone version is >= 0.3.26 (ubuntu 20.04 is 0.3.13)
commands.append(f"{clone_program} --logfile {image}.p{n}.{clone_program}.log --clone --ncurses --source {loopback_dev}p{n} --output {image}.p{n}.{clone_program}")
print(f" cloning with {clone_program}")
print("Analyzed. Will execute the following commands to create image files: ")
print()
for c in commands:
print(c)
go = input("Continue (only 'yes' will be accepted)? ")
if go != "yes":
print("Exiting...")
exit(1)
print()
for c in commands:
print(f"Running: {c}")
ret = run_foreground(c)
#!/bin/bash
DEVICE=$1
# Sanity check to ensure we don't try to overwrite some important disk we have...
DISK_LIMIT=$((40*1024*1024*1024)) # 40GiB
DISK_SIZE=$(lsblk -b --output SIZE -n -d $DEVICE)
echo $DISK_SIZE
if [[ $DISK_SIZE -ge $DISK_LIMIT ]]; then
echo "Disk too large ($DISK_SIZE > $DISK_LIMIT), bailing because this is probably a mistake"
exit 1
fi
# CHANGE THIS LINE IF NEEDED to match the image you downloaded
IMAGE_BASE="midatlantic-2024-02-16_image-amd64.img"
set -x
dd if=./${IMAGE_BASE}.bootbits of=${DEVICE}
sfdisk $DEVICE < ./${IMAGE_BASE}.sfdisk
dd if=./${IMAGE_BASE}.p1.dd of=${DEVICE}1
# throw a sleep 20 in here so all other instances running can get the partition table sorted before getting stuck waiting for sync
sleep 20
partclone.vfat -L /tmp/restore_log.$$ --restore --source ./${IMAGE_BASE}.p2.partclone.vfat --output ${DEVICE}2
partclone.vfat -L /tmp/restore_log.$$ --restore --source ./${IMAGE_BASE}.p3.partclone.vfat --output ${DEVICE}3
partclone.extfs -L /tmp/restore_log.$$ --restore --source ./${IMAGE_BASE}.p4.partclone.extfs --output ${DEVICE}4
set +x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment