Skip to content

Instantly share code, notes, and snippets.

@RavuAlHemio
Last active May 2, 2022 06:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RavuAlHemio/ca0f7afc753f63895b69cc22dc9881b8 to your computer and use it in GitHub Desktop.
Save RavuAlHemio/ca0f7afc753f63895b69cc22dc9881b8 to your computer and use it in GitHub Desktop.
automates inserting and removing USB drives backed by a VHDX image from a QEMU VM
& "C:\Program Files\qemu\qemu-system-i386.exe" `
-cpu pentium3 `
-m 1024 `
-vga cirrus `
-drive "if=ide,bus=0,unit=0,media=disk,file=w2k_de.qcow2" `
-drive "if=ide,bus=1,unit=0,media=cdrom" `
-device sb16 `
-device "usb-ehci" `
-device "usb-tablet" `
-qmp "tcp:127.0.0.1:6969,server,wait=off" `
-nic none `
-rtc "base=localtime"
#!/usr/bin/env python3
#
# automates inserting and removing USB drives backed by a VHDX image from a QEMU VM
#
import argparse
import json
import socket
from typing import Any
def recv_json(sock: socket.socket) -> Any:
bs_list: list[bytes] = []
op_stack: list[str] = []
escape_level = 0
anything_read = False
while True:
b = sock.recv(1)
bs_list.append(b)
if not anything_read and b.isspace():
# skip spaces
continue
anything_read = True
if escape_level > 0:
if escape_level == 1 and b == "u":
# Unicode escape; four more characters follow
escape_level = 4
else:
escape_level -= 1
continue
if op_stack and op_stack[-1] == '"':
# reading a string
if b == b'\\':
# escape starts
escape_level = 1
elif b == b'"':
assert escape_level == 0
# end of string
op_stack.pop()
# anything else is part of the string
continue
# not reading a string
if b == b'[':
# list start
op_stack.append('[')
elif b == b'{':
# object start
op_stack.append('{')
elif b == b']':
# list end
op = op_stack.pop()
if op != '[':
raise ValueError(f"closing '[' with {op!r}")
elif b == b'}':
# object end
op = op_stack.pop()
if op != '{':
raise ValueError(f"closing '{{' with {op!r}")
if not op_stack:
# nothing left on the stack
# this is the end of the JSON document
break
# collect bytes, decode as UTF-8, load as JSON
json_bs = b"".join(bs_list)
json_str = json_bs.decode("utf-8")
return json.loads(json_str)
def send_json(sock: socket.socket, val: Any):
sock.send(json.dumps(val).encode("utf-8"))
def query_cmds(sock: socket.socket) -> Any:
sock.send(b'{"execute":"query-commands"}')
return recv_json(sock)
def unplug_usb(sock: socket.socket, name: str):
# remove USB storage device
usb_stor_add = {
"execute": "device_del",
"arguments": {
"id": name,
},
}
usb_unplugged = False
send_json(sock, usb_stor_add)
result = recv_json(sock)
print(result)
if "error" in result:
if result["error"]["class"] == "DeviceNotFound":
# oh well
usb_unplugged = True
# await device deletion
while not usb_unplugged:
event = recv_json(sock)
print(event)
if event.get("event", "") == "DEVICE_DELETED":
if event["data"].get("device", "") == name:
# perfect
usb_unplugged = True
def unplug_block(sock: socket.socket, name: str):
# remove block device
block_add = {
"execute": "blockdev-del",
"arguments": {
"node-name": name,
},
}
send_json(sock, block_add)
result = recv_json(sock)
print(result)
# block devices are unplugged immediately
def main():
parser = argparse.ArgumentParser()
parser.add_argument(dest='mode', choices=("plug", "unplug", "cmds"))
args = parser.parse_args()
sock = socket.create_connection(("127.0.0.1", 6969))
# obtain greeting
recv_json(sock)
# negotiate capabilities
send_json(sock, {"execute": "qmp_capabilities"})
recv_json(sock)
if args.mode == "cmds":
# get commands
send_json(sock, {"execute": "query-commands"})
cmds_resp = recv_json(sock)
for cmd in sorted(c["name"] for c in cmds_resp["return"]):
print(cmd)
elif args.mode == "plug":
# add block device
block_add = {
"execute": "blockdev-add",
"arguments": {
"driver": "vhdx",
"node-name": "stic2k-blk",
"file": {
"driver": "file",
"filename": "stic2k.vhdx",
},
},
}
send_json(sock, block_add)
print(recv_json(sock))
# add USB storage device
usb_stor_add = {
"execute": "device_add",
"arguments": {
"driver": "usb-storage",
"drive": "stic2k-blk",
"id": "stic2k-usb",
},
}
send_json(sock, usb_stor_add)
print(recv_json(sock))
else:
assert args.mode == "unplug"
unplug_usb(sock, "stic2k-usb")
unplug_block(sock, "stic2k-blk")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment