Skip to content

Instantly share code, notes, and snippets.

@maximilize
Created September 13, 2018 03:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maximilize/62071aed9d7b55f4a887cc56a6d91785 to your computer and use it in GitHub Desktop.
Save maximilize/62071aed9d7b55f4a887cc56a6d91785 to your computer and use it in GitHub Desktop.
Script to detect and shutdown not needed qubes VM's
#!/usr/bin/env python
import subprocess
import re
from collections import namedtuple
import binascii
import time
import json
IGNORE = """
nm-applet
xdg-desktop-portal-gtk
gsd-xsettings
mate-notification-daemon
"""
IGNORE = [line.strip() for line in IGNORE.splitlines()]
IGNORE = set(IGNORE)
try:
with open("/run/qubes/vm-idle-state.json", "r") as f:
PERSIST = json.load(f)
except Exception:
PERSIST = {}
PERSIST_DEFAULTS = {
"state": None,
"netvm": None,
"autostart": None,
"in_use": None,
"run_id": 0,
"idle": 0,
"user_warned": 0
}
IDLE_TRESHOLD = 300
STOP_SECONDS = 60
class VM:
def __init__(self, name, state, netvm, autostart):
self.name = name
self.state = state
self.netvm = netvm
self.autostart = autostart == "True"
self.titles = set()
self.klasses = set()
self.names = set()
if self.name in PERSIST:
temp = PERSIST[self.name]
PERSIST[self.name] = dict(PERSIST_DEFAULTS)
PERSIST[self.name].update(temp)
else:
PERSIST[self.name] = dict(PERSIST_DEFAULTS)
self.persist = PERSIST[self.name]
def __str__(self):
return "VM(%s, netvm=%s, autostart=%s, in_use=%s, run_id=%s)" % (
self.name, self.netvm, self.autostart, self.in_use, self.run_id)
@property
def in_use(self):
if vm.autostart:
return True
if vm.titles and vm.klasses and vm.names:
return True
return False
@property
def run_id(self):
x = '/'.join(sorted(self.titles | self.klasses | self.names))
return binascii.crc32(x.encode('utf-8'))
def load_windows(self):
try:
output = subprocess.check_output([
"/usr/bin/qvm-run",
"--pass-io",
self.name,
"--",
"/usr/bin/xwininfo -root -children"]).decode("utf-8")
except subprocess.CalledProcessError:
print("VM %s is missing the xwininfo tool" % self.name)
self.titles.add("__TOOL_MISSING__")
return
for line in output.splitlines():
line = line.strip()
if not line.startswith("0x"):
continue
id, line = line.split(' ', 1)
title, line = line.split(': (')
infos, line = line.split(') ', 1)
size, size2 = line.split(' ')
title = title.strip('"')
if title == '(has no name)':
title = ''
if infos:
klass, name = infos[1:-1].split('" "', 1)
else:
klass, name = '', ''
self.titles.add(title)
self.klasses.add(klass)
self.names.add(name)
self.titles -= IGNORE
self.klasses -= IGNORE
self.names -= IGNORE
def notify_send(caption, text):
print("%s: %s" % (caption, text))
subprocess.check_output(["/usr/bin/notify-send", caption, text])
vms = []
output = subprocess.check_output([
"/usr/bin/qvm-ls",
"--no-spinner",
"--fields",
"name,state,netvm,autostart"]).decode("utf-8")
for line in output.splitlines()[1:]:
line = re.sub(r" +", " ", line)
vm = VM(*line.split(" "))
if vm.name != 'dom0':
vms.append(vm)
for vm in vms:
# Check open windows inside VM
if not vm.autostart and vm.state == 'Running':
vm.load_windows()
# Update the persistent VM state
for key in ("state", "netvm", "autostart", "run_id"):
if vm.persist[key] != getattr(vm, key):
vm.persist[key] = getattr(vm, key)
vm.persist["idle"] = 0
# Set the idle time
if not vm.persist["idle"] and not vm.in_use:
vm.persist["idle"] = time.time()
# Ignore halted/stopping VM's
if vm.state != 'Running':
continue
# Ignore active and autostart VM's
if not vm.persist["idle"] or vm.autostart:
continue
# Continue if the idle treshold is not yet reached
idle_since = time.time() - vm.persist["idle"]
if idle_since < IDLE_TRESHOLD:
continue
# Check if any VM depends on this one
if any (x.name for x in vms if x.netvm == vm.name and x.state != 'Halted'):
continue
# Warn the user that this VM will be shut down soon
if vm.persist["user_warned"] != vm.persist["idle"]:
notify_send("Not used VM", "The VM %s will be stopped in %s seconds" % (
vm.name, STOP_SECONDS))
vm.persist["idle"] = time.time() - IDLE_TRESHOLD
vm.persist["user_warned"] = vm.persist["idle"]
continue
if idle_since < IDLE_TRESHOLD + STOP_SECONDS:
continue
# Stop the VM
notify_send("Stopping VM", "Stopping %s after %i minutes idle time" % (
vm.name, idle_since/60))
subprocess.check_output(["/usr/bin/qvm-shutdown", vm.name])
vm.persist["idle"] = time.time() - IDLE_TRESHOLD
vm.persist["user_warned"] = vm.persist["idle"]
with open("/run/qubes/vm-idle-state.json", "w") as f:
json.dump(PERSIST, f)
@maximilize
Copy link
Author

Run via a cronjob in dom0. The guest machines need to have the tool xwininfo installed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment