Skip to content

Instantly share code, notes, and snippets.

@Rudd-O
Created February 27, 2023 02:59
Show Gist options
  • Save Rudd-O/2d6f071abe343b9098e5f9a749fdc6ca to your computer and use it in GitHub Desktop.
Save Rudd-O/2d6f071abe343b9098e5f9a749fdc6ca to your computer and use it in GitHub Desktop.
(very green) ZFS driver for Qubes volume pools.
"""
Driver for storing qube images in ZFS pool volumes.
"""
import dataclasses
import logging
import os
import subprocess
import time
import asyncio
import qubes
import qubes.storage
import qubes.utils
from typing import cast, Optional, TypedDict, Dict, List, Union
ZVOL_DIR = "/dev/zvol"
EXPORTED = ".exported"
REVISION_PREFIX = "qubes-"
_sudo, _dd, _zfs, _zpool = "sudo", "dd", "zfs", "zpool"
logging.getLogger(__name__).setLevel(logging.DEBUG)
class Vid(str):
@classmethod
def make(klass, container: str, vm_name: str, volume_name: str) -> "Vid":
return Vid("{!s}/{!s}/{!s}".format(container, vm_name, volume_name))
def timestamp_to_revision(timestamp: Union[int, str, float]) -> str:
"""
Converts a timestamp to a revision.
>>> timestamp_to_revision(123)
"qubes-123"
"""
return REVISION_PREFIX + str(int(timestamp))
def dataset_in_root(dataset: str, root: str):
return dataset == root or (dataset + "/").startswith(root + "/")
def is_revision_dataset(fsname: str) -> bool:
return fsname.split("@")[-1].startswith(REVISION_PREFIX)
def timestamp_from_revision(fsname: str) -> int:
ln = len(REVISION_PREFIX)
return int(fsname.split("@")[-1][ln:])
async def dd(
inpath: str,
outpath: str,
log: logging.Logger = logging.getLogger(__name__),
):
thecmd = [
_dd,
"if=" + inpath,
"of=" + outpath,
"conv=sparse,nocreat,fsync",
"status=progress",
"bs=1M",
]
if not os.access(outpath, os.W_OK) or not os.access(inpath, os.R_OK):
thecmd = [_sudo] + thecmd
log.debug("Invoked with arguments %r", thecmd)
p = await asyncio.create_subprocess_exec(*thecmd)
ret = await p.wait()
if ret != 0:
raise qubes.storage.StoragePoolException(
"%s failed with error %s" % (thecmd, ret)
)
def _process_zfs_output(returncode, stdout, stderr, log) -> str:
"""Process output of ZFS, determine if the call was successful and
possibly log warnings."""
# Filter out warning about intended over-provisioning.
# Upstream discussion about missing option to silence it:
# https://bugzilla.redhat.com/1347008
err = "\n".join(line for line in stderr.decode().splitlines())
if stdout:
log.debug("Result: %s", stdout.decode().rstrip())
if returncode == 0 and err:
log.warning("Stderr: %s", err)
elif returncode != 0:
assert err, "Command exited unsuccessful, but printed nothing to stderr"
err = err.replace("%", "%%")
raise qubes.storage.StoragePoolException(err)
return stdout.decode()
def qubes_zfs(
*cmd: str,
log: logging.Logger = logging.getLogger(__name__),
) -> str:
"""
Call :program:`zfs` to execute a ZFS operation.
This version is synchronous.
"""
environ = {"LC_ALL": "C.UTF-8", **os.environ}
if cmd and cmd[0] == "zpool":
thecmd = [_zpool] + list(cmd)[1:]
else:
thecmd = [_zfs] + list(cmd)
if os.getuid() != 0:
thecmd = [_sudo] + thecmd
log.debug("Invoked with arguments %r", thecmd)
p = subprocess.Popen(
thecmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
env=environ,
)
out, err = p.communicate()
return _process_zfs_output(p.returncode, out, err, log)
async def qubes_zfs_coro(
*cmd: str,
log: logging.Logger = logging.getLogger(__name__),
) -> str:
"""
Call :program:`zfs` to execute a ZFS operation
This version is asynchronous.
"""
def synced() -> str:
return qubes_zfs(*cmd, log=log)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, synced)
class ZFSAccessor(object):
def __init__(self, root: str) -> None:
self.root = root
async def volume_exists_async(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
):
assert dataset_in_root(volume, self.root)
try:
await qubes_zfs_coro("list", "-Hp", "-o", "name", volume, log=log)
return True
except qubes.storage.StoragePoolException:
pass
return False
def volume_exists(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
):
assert dataset_in_root(volume, self.root)
try:
qubes_zfs("list", volume, log=log)
return True
except qubes.storage.StoragePoolException:
pass
return False
async def remove_volume_async(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert dataset_in_root(volume, self.root)
await qubes_zfs_coro(
"destroy",
"-r",
volume,
log=log,
)
async def snapshot_volume_async(
self,
volumesnapshotname: str,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert "@" in volumesnapshotname
dataset = volumesnapshotname.split("@")[0]
assert dataset_in_root(dataset, self.root)
await qubes_zfs_coro(
"snapshot",
volumesnapshotname,
log=log,
)
async def clone_snapshot_to_volume_async(
self,
source: str,
dest: str,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert "@" in source
await qubes_zfs_coro(
"clone",
"-p",
source,
dest,
log=log,
)
async def rename_volume_async(
self,
source: str,
dest: str,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
await qubes_zfs_coro(
"rename",
source,
dest,
log=log,
)
async def create_volume_async(
self,
volume: str,
size: int,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert dataset_in_root(volume, self.root)
await qubes_zfs_coro(
"create",
"-p",
"-s",
"-V",
str(size),
volume,
log=log,
)
async def set_volume_readonly_async(
self,
volume: str,
readonly: bool,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert dataset_in_root(volume, self.root)
await qubes_zfs_coro(
"set",
"readonly=%s" % ("on" if readonly else "off"),
volume,
log=log,
)
async def resize_volume_async(
self,
volume: str,
size: int,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert dataset_in_root(volume, self.root)
await qubes_zfs_coro(
"set",
"size=%s" % size,
volume,
log=log,
)
async def set_volume_dirty_async(
self,
volume: str,
dirty: bool,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
assert dataset_in_root(volume, self.root)
await qubes_zfs_coro(
"set",
"org.qubes:dirty=%s" % ("on" if dirty else "off"),
volume,
log=log,
)
def is_volume_dirty(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> bool:
assert dataset_in_root(volume, self.root)
v = qubes_zfs(
"list",
"-Hp",
"-o",
"org.qubes:dirty",
volume,
log=log,
).strip()
return v == "on"
async def is_volume_dirty_async(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> bool:
assert dataset_in_root(volume, self.root)
v = await (
qubes_zfs_coro(
"list",
"-Hp",
"-o",
"org.qubes:dirty",
volume,
log=log,
)
).strip()
return v == "on"
def get_volume_snapshots(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> List[str]:
assert dataset_in_root(volume, self.root)
return [
s
for s in qubes_zfs(
"list",
"-Hp",
"-o",
"name",
"-t",
"snapshot",
volume,
log=log,
).splitlines()
if s.strip()
]
def get_volume_usage(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> int:
assert dataset_in_root(volume, self.root)
return int(
qubes_zfs(
"list",
"-Hp",
"-o",
"used",
volume,
log=log,
).strip()
)
def get_volume_creation(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> int:
assert dataset_in_root(volume, self.root)
return int(
qubes_zfs(
"list",
"-Hp",
"-o",
"creation",
volume,
log=log,
).strip()
)
def get_volume_size(
self,
volume: str,
log: logging.Logger = logging.getLogger(__name__),
) -> int:
assert dataset_in_root(volume, self.root)
return int(
qubes_zfs(
"list",
"-Hp",
"-o",
"volsize",
volume,
log=log,
).strip()
)
def get_pool_available(
self,
log: logging.Logger = logging.getLogger(__name__),
) -> int:
return int(
qubes_zfs(
"zpool",
"list",
"-Hp",
"-o",
"free",
self.root.split("/")[0],
log=log,
).strip()
)
def get_pool_size(
self,
log: logging.Logger = logging.getLogger(__name__),
) -> int:
return int(
qubes_zfs(
"zpool",
"list",
"-Hp",
"-o",
"size",
self.root.split("/")[0],
log=log,
).strip()
)
class ZFSVolume(qubes.storage.Volume):
"""
ZFS thin volume implementation.
"""
def __init__(
self,
name: str,
pool: "ZFSPool",
vid: Vid,
revisions_to_keep: int = 1,
rw: bool = False,
save_on_stop: bool = False,
size: int = 0,
snap_on_start: bool = False,
source: Optional[qubes.storage.Volume] = None,
ephemeral: Optional[bool] = None,
**kwargs,
):
super().__init__(
name=name,
pool=pool,
vid=vid,
revisions_to_keep=revisions_to_keep,
rw=rw,
save_on_stop=save_on_stop,
size=size,
snap_on_start=snap_on_start,
source=source,
ephemeral=ephemeral,
**kwargs,
)
self.pool = cast(ZFSPool, self.pool)
self.log = logging.getLogger("%s.%s" % (__name__, self.pool.name))
@property
def path(self):
return self.vid
@property
def revisions(self) -> Dict[str, str]:
snapshots = self.pool.accessor.get_volume_snapshots(
self.path,
log=self.log,
)
revisions: Dict[str, str] = {}
for snapshot in snapshots:
if is_revision_dataset(snapshot):
timestamp = timestamp_from_revision(snapshot)
name = timestamp_to_revision(timestamp)
revisions[name] = qubes.storage.isodate(timestamp)
return revisions
async def _purge_old_revisions(self):
self.log.debug("_purge_old_revisions %s", self.path)
revisions = self.revisions
if not revisions:
return
revs = list(
reversed(
sorted(
revisions.items(),
key=lambda m: m[1],
)
)
)
r = self.revisions_to_keep
for snapshot, _ in revs[r:]:
await self.pool.accessor.remove_volume_async(
"%s@%s" % (self.path, snapshot),
log=self.log,
)
return
_cached_size: Optional[int] = None
@property
def size(self) -> Optional[int]:
if self._cached_size is None:
try:
self._cached_size = self.pool.accessor.get_volume_size(
self.path,
log=self.log,
)
except qubes.storage.StoragePoolException:
pass
return self._cached_size
@property
def usage(self) -> int:
try:
return self.pool.accessor.get_volume_usage(
self.path,
log=self.log,
)
except qubes.storage.StoragePoolException:
return 0
async def _clone_from(self, source: qubes.storage.Volume):
# FIXME use suffix in create empty or clone before destroying final
self.log.debug("Cloning into %s from %s", self.path, source)
self._cached_size = None
if isinstance(source, ZFSVolume):
# The source device is a ZFS one;
# simply find out what its latest qubes snapshot is,
# then clone it from there.
# FIXME: perhaps clone to a temp dataset
# before deleting the target dataset.
snaps = source.revisions
assert snaps, "the source %s has no revisions" % (source.path,)
snapshot = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0]
if await self.pool.accessor.volume_exists_async(
self.path,
log=self.log,
):
self.log.debug(
"Volume %s exists, removing prior to ZFS clone",
self.path,
)
await self.pool.accessor.remove_volume_async(
self.path,
log=self.log,
)
src = "%s@%s" % (source.path, snapshot)
self.log.debug(
"Creating volume %s with cloning from %s",
self.path,
src,
)
await self.pool.accessor.clone_snapshot_to_volume_async(
src,
self.path,
log=self.log,
)
else:
# Source is not a ZFS one;
# create the dataset with the size of the
# source (or larger if requested by user)
# and dd the contents sparsely.
self.log.debug("This is not a ZFS volume")
assert self._size, "no size set for this volume"
# FIXME optimize, if the volume exists but is smaller
# than the source, simply grow the volume instead of
# nuking it completely.
self.log.debug("Sizes in play: %s %s", self._size, source.size)
size = max([self._size, source.size])
self.log.debug("Creating empty volume %s for cloning", self.path)
if await self.pool.accessor.volume_exists_async(
self.path,
log=self.log,
):
self.log.debug(
"Volume %s exists, removing prior to non-ZFS clone",
self.path,
)
await self.pool.accessor.remove_volume_async(
self.path,
log=self.log,
)
await self.pool.accessor.create_volume_async(
self.path,
size,
log=self.log,
)
infile = await source.export()
try:
self.log.debug(
"Copying %s to %s",
infile,
os.path.join(ZVOL_DIR, self.path),
)
await dd(infile, os.path.join(ZVOL_DIR, self.path), self.log)
finally:
await source.export_end(infile)
async def _create_empty(
self,
size: Optional[int] = None,
suffix: str = "",
) -> None:
self.log.debug(
"_create_empty %s with size %s self._size %s",
self.path,
size,
self._size,
)
self._cached_size = None
if size is None:
assert self._size
if await self.pool.accessor.volume_exists_async(
self.path + suffix,
log=self.log,
):
await self.pool.accessor.remove_volume_async(
self.path + suffix,
log=self.log,
)
await self.pool.accessor.create_volume_async(
self.path + suffix,
size if size is not None else self._size,
log=self.log,
)
async def _remove_all_exported(self) -> bool:
self.log.debug("_remove_all_exported %s", self.path)
exported = os.path.join(
self.pool.container,
EXPORTED,
self.vid.replace("/", "_"),
)
if await self.pool.accessor.volume_exists_async(
exported,
log=self.log,
):
await self.pool.accessor.remove_volume_async(
exported,
log=self.log,
)
return True
return False
async def _remove_unlocked(self, suffix: str = "") -> "ZFSVolume":
self.log.debug("Removing %s with suffix %s", self.path, suffix)
if suffix == "" and await self._remove_all_exported():
pass
if await self.pool.accessor.volume_exists_async(
self.path + suffix,
log=self.log,
):
self._cached_size = None
await self.pool.accessor.remove_volume_async(
self.path + suffix,
log=self.log,
)
return self
async def _adopt(self, suffix: str):
self.log.debug("Adopting %s from suffix %s", self.path, suffix)
self._cached_size = None
await self.remove()
await self.pool.accessor.rename_volume_async(
self.path + suffix,
self.path,
log=self.log,
)
@qubes.storage.Volume.locked
async def remove(self, suffix: str = "") -> "ZFSVolume":
return await self._remove_unlocked(suffix=suffix)
@qubes.storage.Volume.locked
async def create(self) -> "ZFSVolume":
self.log.debug("Creating %s", self.path)
if self.snap_on_start and self.save_on_stop:
assert 0, "snap_on_start true && save_on_stop true"
if self.save_on_stop:
if self.source:
await self._clone_from(self.source)
else:
await self._create_empty()
return self
@qubes.storage.Volume.locked
async def start(self):
self.log.debug("Starting volume %s", self.path)
snap_on_start, save_on_stop = (self.snap_on_start, self.save_on_stop)
if not snap_on_start and not save_on_stop:
# Volatile.
if self.source:
self.log.debug(
"Cloning volatile %s from %s",
self.path,
self.source,
)
await self._clone_from(self.source)
else:
self.log.debug(
"Creating volatile %s empty",
self.path,
)
await self._create_empty()
elif not snap_on_start and save_on_stop:
# Private / persistent. Dataset already created.
self.log.debug("Dirtying up %s", self.path)
await self.pool.accessor.set_volume_dirty_async(
self.path,
True,
log=self.log,
)
elif snap_on_start and not save_on_stop:
# Root / ephemeral. Clone or create from source.
if self.source:
self.log.debug(
"Cloning ephemeral %s from %s",
self.path,
self.source,
)
await self._clone_from(self.source)
else:
self.log.debug(
"Creating ephemeral %s empty",
self.path,
)
await self._create_empty()
elif snap_on_start and save_on_stop:
assert 0, "snap_on_start && save_on_stop on %s" % self.path
await self.pool.accessor.set_volume_readonly_async(
self.path,
not self.rw,
log=self.log,
)
@qubes.storage.Volume.locked
async def stop(self) -> "ZFSVolume":
self.log.debug("Stopping volume %s", self.path)
snap_on_start, save_on_stop = (self.snap_on_start, self.save_on_stop)
if not snap_on_start and not save_on_stop:
# Volatile.
await self._remove_unlocked()
elif not snap_on_start and save_on_stop:
# Private / persistent.
if await self.pool.accessor.is_volume_dirty_async(self.path):
await self.pool.accessor.set_volume_dirty_async(
self.path,
False,
log=self.log,
)
# We snapshot in every case to allow for the ability
# to export / clone the volume. This volume
# is now clean and therefore cleanly exportable.
await self.pool.accessor.snapshot_volume_async(
"%s@%s"
% (
self.path,
timestamp_to_revision(time.time()),
),
log=self.log,
)
await self._purge_old_revisions()
elif snap_on_start and not save_on_stop:
# Root / ephemeral.
pass
elif snap_on_start and save_on_stop:
assert 0, "snap_on_start && save_on_stop on %s" % self.path
return self
async def export(self) -> str:
"""Returns an object that can be `open()`."""
self.log.debug("Start of export of %s", self.path)
snaps = self.revisions
assert snaps, "%s has no revisions" % self.path
latest_snap = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0]
exported = os.path.join(
self.pool.container,
EXPORTED,
self.vid.replace("/", "_"),
latest_snap,
)
await self.pool.accessor.clone_snapshot_to_volume_async(
"%s@%s" % (self.path, latest_snap),
exported,
log=self.log,
)
return os.path.join(ZVOL_DIR, exported)
async def export_end(self, exported_path: str) -> None:
"""Removes the previous export."""
self.log.debug(
"End of export of %s to path %s",
self.path,
exported_path,
)
snapname = os.path.basename(exported_path)
exported = os.path.join(
self.pool.container,
EXPORTED,
self.vid.replace("/", "_"),
snapname,
)
await self.pool.accessor.remove_volume_async(
exported,
log=self.log,
)
def block_device(self):
"""Return :py:class:`qubes.storage.BlockDevice` for serialization in
the libvirt XML template as <disk>.
"""
return qubes.storage.BlockDevice(
os.path.join(ZVOL_DIR, self.path),
self.name,
None,
self.rw,
self.domain,
self.devtype,
)
@qubes.storage.Volume.locked
async def import_volume(
self,
src_volume: qubes.storage.Volume,
) -> "ZFSVolume":
self.log.debug(
"Importing volume %s from source %s",
self.path,
src_volume,
)
if not src_volume.save_on_stop:
return self
if self.is_dirty():
raise qubes.storage.StoragePoolException(
"Cannot import to dirty volume {} -"
" start and stop a qube to cleanup".format(self.path)
)
self._cached_size = None
await self._clone_from(src_volume)
return self
def abort_if_import_in_progress(self):
if self.pool.accessor.volume_exists(
self.path + "-import-data",
log=self.log,
):
raise qubes.storage.StoragePoolException(
"Import operation in progress on {}".format(self.path)
)
@qubes.storage.Volume.locked
async def import_data(self, size) -> str:
"""Returns an object that can be `open()`."""
self.log.debug("Importing data of size %s into %s", size, self.path)
if self.is_dirty():
raise qubes.storage.StoragePoolException(
"Cannot import data to dirty volume {} -"
" stop the qube using it first".format(self.path)
)
self.abort_if_import_in_progress() # FIXME
await self._create_empty(size, suffix="-import-data")
return os.path.join(ZVOL_DIR, self.path + "-import-data")
@qubes.storage.Volume.locked
async def import_data_end(self, success):
"""Either commit imported data, or discard temporary volume"""
self.log.debug("End of importing data into %s", self.path)
if success:
await self._adopt(suffix="-import-data")
else:
await self.remove(suffix="-import-data")
def is_dirty(self) -> bool:
if self.save_on_stop:
return "on" == self.pool.accessor.is_volume_dirty(
self.path,
log=self.log,
)
return False
@qubes.storage.Volume.locked
async def resize(self, size):
"""
Expands volume.
Throws
:py:class:`qubst.storage.qubes.storage.StoragePoolException` if
given size is less than current_size.
"""
self.log.debug("Resizing %s to %s", self.path, size)
if size == self.size:
return
if size < self.size:
raise qubes.storage.StoragePoolException(
"Shrinking of ZFS volume %s is not possible" % (self.path,)
)
self._cached_size = None
await self.pool.accessor.resize_volume_async(
self.path,
size,
log=self.log,
)
def is_outdated(self):
self.log.debug("is_outdated %s", self.path)
if not self.snap_on_start:
return False
if not self.is_dirty():
return False
if not self.source:
return False
assert isinstance(self.source, ZFSVolume)
source_revs = self.source.revisions.items()
s = list(
sorted(
source_revs,
key=lambda m: m[1],
)
)
assert source_revs
last_source_rev_isodate = s[-1][1]
this_volume_timestamp = self.pool.accessor.get_volume_creation(
self.path,
log=self.log,
)
this_isodate = qubes.storage.isodate(this_volume_timestamp)
return last_source_rev_isodate > this_isodate
@qubes.storage.Volume.locked
async def revert(self, revision: str = None):
self.log.debug("revert %s to %s", self.path, revision)
if self.is_dirty():
raise qubes.storage.StoragePoolException(
"Cannot revert dirty volume {} -"
" stop the qube first".format(
self.path,
)
)
self.abort_if_import_in_progress()
snaps = self.revisions
assert snaps, "volume %s has no revisions" % (self.path,)
if revision is None:
snap = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0]
else:
snap = revision
await self.pool.accessor.rollback_to_snapshot_async(
self.path,
snap,
log=self.log,
)
return self
async def verify(self) -> bool:
"""Verifies the volume."""
self.log.debug("verify %s", self.path)
if not self.snap_on_start and not self.save_on_stop:
return True
if await self.pool.accessor.volume_exists_async(
self.path,
log=self.log,
):
return True
raise qubes.storage.StoragePoolException(
"volume {} missing".format(
self.path,
)
)
@dataclasses.dataclass
class ZFSSnapshot:
name: str
timestamp: int
@dataclasses.dataclass
class ZFSVolumeStat:
vid: Vid
size: int
used: int
snapshots: List[ZFSSnapshot] = dataclasses.field(default_factory=list)
@dataclasses.dataclass
class ZFSVolumeCache:
cache: Dict[Vid, ZFSVolumeStat] = dataclasses.field(default_factory=dict)
pool_used: int = 0
pool_available: int = 0
time: float = 0
class ZFSPoolConfig(TypedDict):
name: str
container: str
driver: str
revisions_to_keep: int
ephemeral_volatile: bool
class ZFSPool(qubes.storage.Pool):
"""ZFS thin storage for Qubes OS.
Volumes are stored as ZFS volumes, under a container dataset
specified by the *container* argument. Here is the general
naming scheme for the volumes:
{vm_name}/{volume_name}
On VM startup, the volume contents are modified, depending on
volume type, according to the table below:
snap_on_start, save_on_stop typical use
False False volatile
upon domain start:
the volume is recursively destroyed and recreated
to its specifications, or cloned from its source
upon domain stop:
the volume is removed completely
False True private / full persistence
upon domain start:
the volume is used as-is, but a revision snapshot
is created before starting the qube.
upon domain stop:
revision snapshots beyond revisions_to_keep (not
including the volume itself) are removed.
True False root / volatile
upon domain start:
the volume is recursively destroyed and recreated,
cloning it from the last committed state of the
corresponding source volume, and then applying
the volume's storage specifications (size)
upon domain stop:
the volume is kept, knowing the next start it
will be recreated; revision snapshots beyond
revisions_to_keep (not including the volume itself)
are removed.
True True unsupported
The format of the revision name is `qubes-{timestamp}`,
corresponding to a volume snapshot name of `@qubes-{timestamp}`,
where `timestamp` is in '%s' format (seconds since unix epoch),
""" # pylint: disable=protected-access
driver = "zfs"
def __init__(
self,
*,
name: str,
revisions_to_keep: int = 1,
container: str,
ephemeral_volatile: bool = False,
):
super().__init__(
name=name,
revisions_to_keep=revisions_to_keep,
ephemeral_volatile=ephemeral_volatile,
)
self.container = container
self._volume_objects_cache: Dict[Vid, ZFSVolume] = {}
self.log = logging.getLogger("%s.%s" % (__name__, self.name))
self.accessor = ZFSAccessor(self.container)
def __repr__(self):
return "<{} at {:#x} name={!r} container={!r}>".format(
type(self).__name__, id(self), self.name, self.container
)
@property
def config(self) -> ZFSPoolConfig:
return ZFSPoolConfig(
{
"name": self.name,
"container": self.container,
"driver": self.driver,
"revisions_to_keep": self.revisions_to_keep,
"ephemeral_volatile": self.ephemeral_volatile,
}
)
async def destroy(self):
"""
Destroy this pool.
In the current implementation we ignore this request.
A full implementation would simply zfs destroy recursively.
"""
pass # TODO Should we remove an existing pool?
def init_volume(self, vm, volume_config):
"""
Initialize a :py:class:`qubes.storage.Volume` from `volume_config`.
"""
c = volume_config
if "vid" not in c:
if vm and hasattr(vm, "name"):
vm_name = vm.name
else:
# for the future if we have volumes not belonging to a vm
vm_name = qubes.utils.random_string()
vid = Vid.make(self.container, vm_name, volume_config["name"])
else:
vid = c["vid"]
revisions_to_keep = (
self.revisions_to_keep
if "revisions_to_keep" not in c
else c["revisions_to_keep"]
)
volume = ZFSVolume(
c["name"],
self,
vid,
revisions_to_keep,
c.get("rw", False),
c.get("save_on_stop", False),
c.get("size", 0),
c.get("snap_on_Start", False),
c.get("source", None),
c.get("ephemeral", self.ephemeral_volatile),
)
self._volume_objects_cache[vid] = volume
return volume
async def __init_container(self):
try:
await qubes_zfs_coro("list", self.container)
except qubes.storage.StoragePoolException:
await qubes_zfs_coro(
"create",
"-o",
"mountpoint=none",
"-p",
self.container,
)
async def setup(self):
await self.__init_container()
def get_volume(self, vid: Vid):
"""Return a volume with given vid"""
if vid in self._volume_objects_cache:
return self._volume_objects_cache[vid]
# don't cache this object, as it doesn't carry full configuration
return ZFSVolume("unconfigured", self, vid)
def list_volumes(self) -> List[ZFSVolume]:
"""Return a list of volumes managed by this pool"""
return [v for v in self._volume_objects_cache.values()]
@property
def size(self) -> int:
"""
Return size in bytes of the pool
"""
return self.accessor.get_pool_size()
@property
def usage(self) -> int:
"""
Return usage of pool in percent (0-100).
Synchronously refreshes the cache just like the LVM driver does.
"""
usage = self.size - self.accessor.get_pool_available()
return int(usage / self.accessor.get_pool_size() * 100)
@property
def usage_details(self):
"""
Return usage details of pool.
Synchronously refreshes the cache.
"""
result = {}
result["data_size"] = self.size
result["data_usage"] = self.usage
# I am not sure what metadata_size or metadata_usage
# are supposed to mean in the context of ZFS.
# ZFS does not provide such numbers.
# I will let the reviewers check that out.
metadata_size = 0
metadata_usage = 0
result["metadata_size"] = metadata_size
result["metadata_usage"] = metadata_usage
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment