Skip to content

Instantly share code, notes, and snippets.

@zyga

zyga/raspi-tool.py

Last active Sep 25, 2019
Embed
What would you like to do?
raspi-tool for sending/receiving images from running Raspberry Pi devices
#!/usr/bin/env python3
import abc
import argparse
import contextlib
import io
import os
import platform
import socket
import subprocess
import sys
import zlib
from typing import Any, Callable, Optional, Sequence, Tuple, Union, List
class ArticulatedValueError(ValueError, argparse.ArgumentTypeError):
"""ArticulatedTypeError is a ValueError that contains an error message
displayed by argparse."""
def parse_host_and_maybe_port(host_port: str) -> Tuple[str, Optional[int]]:
"""
parse_host_and_maybe_port splits host:port into a typed tuple.
The tuple always contains an address but the port number may be None. A
ValueError is raised if the port is provided but does not represent a
valid port number.
"""
parts = host_port.split(":", 1)
host = parts[0]
port = None # type: Optional[int]
if len(parts) == 2:
try:
port = int(parts[1])
except ValueError:
raise ArticulatedValueError("port is not a number")
if 0 > port > 65535:
raise ArticulatedValueError("port not in range 0..65535")
return (host, port)
def parse_host_and_port(host_port: str) -> Tuple[str, int]:
"""
parse_host_and_port splits host:port into a typed tuple.
A ValueError is raised if the format is invalid.
"""
host, maybe_port = parse_host_and_maybe_port(host_port)
if maybe_port is None:
# See note in parse_host_and_maybe_port
raise ArticulatedValueError("port number must be explicitly provided")
return (host, maybe_port)
class Device(int):
"""
Device is a device number with major and minor components.
Note that this class does not attempt to mimic peculiar
encoding used by the Linux kernel.
"""
@classmethod
def pack(cls, major: int, minor: int) -> "Device":
return cls((major << 16) | (minor & (1 << 16) - 1))
def __str__(self) -> str:
return "{}:{}".format(self.major, self.minor)
def __repr__(self) -> str:
return "Device.pack({}, {})".format(self.major, self.minor)
@property
def major(self) -> int:
"""major is the higher 16 bits of the device number."""
return self >> 16
@property
def minor(self) -> int:
"""minor is the lower 16 bits of the device number."""
return self & ((1 << 16) - 1)
class OptionalFields(List[str]):
def __str__(self) -> str:
"""__str__ returns the special formatting of optional fields."""
if len(self):
return " ".join(self) + " -"
else:
return "-"
class MountInfoEntry(object):
"""Single entry in /proc/pid/mointinfo, see proc(5)"""
mount_id: int
parent_id: int
dev: Device
root_dir: str
mount_point: str
mount_opts: str
opt_fields: OptionalFields
fs_type: str
mount_source: str
sb_opts: str
def __init__(self) -> None:
self.mount_id = 0
self.parent_id = 0
self.dev = Device.pack(0, 0)
self.root_dir = ""
self.mount_point = ""
self.mount_opts = ""
self.opt_fields = OptionalFields()
self.fs_type = ""
self.mount_source = ""
self.sb_opts = ""
def __eq__(self, other: object) -> "Union[NotImplemented, bool]":
if not isinstance(other, MountInfoEntry):
return NotImplemented
return (
self.mount_id == other.mount_id
and self.parent_id == other.parent_id
and self.dev == other.dev
and self.root_dir == other.root_dir
and self.mount_point == other.mount_point
and self.mount_opts == other.mount_opts
and self.opt_fields == other.opt_fields
and self.fs_type == other.fs_type
and self.mount_source == other.mount_source
and self.sb_opts == other.sb_opts
)
@classmethod
def parse(cls, line: str) -> "MountInfoEntry":
it = iter(line.split())
self = cls()
self.mount_id = int(next(it))
self.parent_id = int(next(it))
dev_maj, dev_min = map(int, next(it).split(":"))
self.dev = Device((dev_maj << 16) | dev_min)
self.root_dir = next(it)
self.mount_point = next(it)
self.mount_opts = next(it)
self.opt_fields = OptionalFields()
for opt_field in it:
if opt_field == "-":
break
self.opt_fields.append(opt_field)
self.fs_type = next(it)
self.mount_source = next(it)
self.sb_opts = next(it)
try:
next(it)
except StopIteration:
pass
else:
raise ValueError("leftovers after parsing {!r}".format(line))
return self
def __str__(self) -> str:
return (
"{0.mount_id} {0.parent_id} {0.dev} {0.root_dir}"
" {0.mount_point} {0.mount_opts} {0.opt_fields} {0.fs_type}"
" {0.mount_source} {0.sb_opts}"
).format(self)
def __repr__(self) -> str:
return "MountInfoEntry.parse({!r})".format(str(self))
@property
def dev_maj(self) -> int:
return self.dev.major
@property
def dev_min(self) -> int:
return self.dev.minor
class ReMountReadOnly:
"""ReMountReadOnly is a context manager remounting filesystem to read-only."""
def __init__(
self, path: str, on_remount: Optional[Callable[[str, bool], None]] = None
):
self.path = path
self.altered = False
self.on_remount = on_remount
def __enter__(self) -> "Optional[ReMountReadOnly]":
retcode = subprocess.call(["mount", "-o", "remount,ro", self.path])
if retcode == 0:
self.altered = True
if self.on_remount is not None:
self.on_remount(self.path, True)
return self
return None
def __exit__(self, *exc_details: Any) -> None:
if not self.altered:
return
retcode = subprocess.call(["mount", "-o", "remount,rw", self.path])
if retcode == 0 and self.on_remount is not None:
self.on_remount(self.path, False)
class Command(abc.ABC):
"""Command is the interface for command line commands."""
class Args(argparse.Namespace):
"""Args describes the parsed arguments."""
@abc.abstractproperty
def name(self) -> str:
"""name of the command"""
@abc.abstractproperty
def description(self) -> str:
"""description of the command"""
@abc.abstractmethod
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
"""configure_parser configures the parser specific to the command."""
@abc.abstractmethod
def invoke(self, args: Args) -> None:
"""invoke executes the command with the given arguments."""
def say(self, msg: str) -> None:
"""say prints stuff as the command"""
print("raspi-tool", msg)
class SendImgCmd(Command):
name = "send-image"
description = "send an image from this device to a remote machine"
class Args(Command.Args):
device: str
addr: Tuple[str, int]
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--device",
metavar="DEVICE",
default="/dev/mmcblk0",
help="File representing SD card",
)
parser.add_argument(
"addr",
metavar="HOST:PORT",
type=parse_host_and_port,
help="Destination (e.g. running raspberry-pi-tool recv-image)",
)
def _on_remounted(self, path: str, is_ro: bool) -> None:
self.say("{} re-mounted {}".format(path, "ro" if is_ro else "rw"))
def invoke(self, args: Args) -> None:
with contextlib.ExitStack() as stack:
try:
sink = socket.create_connection(args.addr)
except IOError as exc:
raise SystemExit("cannot connect to {}: {}".format(args.addr, exc))
else:
stack.enter_context(sink)
with open("/proc/self/mountinfo") as stream:
mountinfo = [MountInfoEntry.parse(line) for line in stream]
for entry in mountinfo:
if entry.mount_source.startswith(args.device):
stack.enter_context(
ReMountReadOnly(entry.mount_point, self._on_remounted)
)
try:
source = io.FileIO(args.device)
except IOError as exc:
raise SystemExit("cannot open {}: {}".format(args.device, exc))
else:
stack.enter_context(source)
self.say("sending {} to {}".format(args.device, args.addr))
packer = zlib.compressobj(level=9)
buf = bytearray(4096)
while True:
n = source.readinto(buf)
if n == 0:
break
zbuf = packer.compress(buf[:n])
sink.sendall(zbuf)
del zbuf
zbuf = packer.flush(zlib.Z_FINISH)
sink.sendall(zbuf)
del zbuf
self.say("sent!")
class RecvImgCmd(Command):
name = "recv-image"
description = "receive an image a remote image and save it locally"
class Args(Command.Args):
img: str
addr: Optional[Tuple[str, int]]
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--addr",
metavar="HOST:PORT",
type=parse_host_and_port,
help="Destination (e.g. running raspberry-pi-tool recv-image)",
)
parser.add_argument(
"--image",
dest="img",
metavar="IMAGE",
help="File representing SD card image",
default="pi.img",
)
def invoke(self, args: Args) -> None:
with contextlib.ExitStack() as stack:
# Open the file we want to write to.
try:
sink = stack.enter_context(io.FileIO(args.img, "w"))
except IOError as exc:
raise SystemExit("cannot open {}: {}".format(args.img, exc))
# Open a socket and find a port to bind to.
sock = stack.enter_context(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
)
host = "0.0.0.0"
port = 12345
if args.addr is not None:
host, port = args.addr
search = 1
else:
search = 10
for try_port in range(port, port + search):
try:
sock.bind((host, try_port))
except OSError as exc:
self.say("cannot bind to {}:{}: {}".format(host, try_port, exc))
else:
break
else:
raise SystemExit("cannot bind to port, giving up")
# Listen for incoming connections.
sock.listen()
self.say("listening on {}:{}".format(host, port))
hostname = socket.gethostname()
hostaddr = socket.gethostbyname(hostname)
if hostaddr.startswith("127."):
self.say("cannot determine useful address of this machine")
else:
self.say("on the raspberry pi issue the following command")
self.say("$ sudo ./raspi-tool.py send-image {}:{}".format(hostaddr, port))
self.say("waiting for connection...")
# Wait for client connection.
try:
source, remote_addr = sock.accept()
except Exception as exc:
raise SystemExit("cannot accept connection: {}".format(exc))
self.say("saving image from {} to {}".format(remote_addr, args.img))
packer = zlib.decompressobj()
buf = bytearray(4096)
while True:
n = source.recv_into(buf, 0)
if n == 0:
break
zbuf = packer.decompress(buf[:n])
sink.write(zbuf)
del zbuf
zbuf = packer.flush(zlib.Z_FINISH)
sink.write(zbuf)
del zbuf
self.say("saved!")
def _make_parser(commands: Sequence[Command]) -> argparse.ArgumentParser:
"""_make_parser creates an argument parser with given subcommands."""
parser = argparse.ArgumentParser(
description="Utility for working with Raspberry Pi devices."
)
parser.add_argument("--version", action="version", version="0.1")
sub = parser.add_subparsers()
for cmd in commands:
cmd_parser = sub.add_parser(cmd.name, help=cmd.description)
cmd.configure_parser(cmd_parser)
cmd_parser.set_defaults(invoke=cmd.invoke)
return parser
def main() -> None:
parser = _make_parser([SendImgCmd(), RecvImgCmd()])
# On windows, when invoked without arguments, default to receiving an image.
if platform.win32_ver()[0] != "" and len(sys.argv) == 1:
args = parser.parse_args(["recv-image"])
else:
args = parser.parse_args()
if not hasattr(args, "invoke"):
parser.error("select command to execute")
args.invoke(args)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment