Skip to content

Instantly share code, notes, and snippets.

@mentha
Created May 3, 2022 13:35
Show Gist options
  • Save mentha/1b9a4a9afffe0ce7de2ba034226bb624 to your computer and use it in GitHub Desktop.
Save mentha/1b9a4a9afffe0ce7de2ba034226bb624 to your computer and use it in GitHub Desktop.
assign static ip and network filters to libvirt guests
#!/usr/bin/env python3
from pprint import pprint
from argparse import ArgumentParser
from ipaddress import IPv4Address
from tempfile import NamedTemporaryFile
import libvirt
import os
import xml.etree.ElementTree as ET
PROGNAME = 'libvirt-assign-ip'
def strlist(a):
return list(filter(None, a.split(',')))
parser = ArgumentParser(description='Assign static IPv4 addresses and network filters to managed libvirt virtual machines.')
parser.add_argument('--connect', '-c', metavar='URI', dest='uri', default=None, action='store', help='Connect to specified URI')
parser.add_argument('--dry-run', '-d', dest='dry', action='store_true', help='Do not change anything')
parser.add_argument('--reset', '-r', dest='reset', action='store_true', help='Reset all addresses')
parser.add_argument('--hosts', '-o', metavar='HOSTS', dest='hosts', action='store', help='Path to hosts file to update, if specified')
parser.add_argument('--network', '-n', metavar='NET', dest='nets', default=[], action='extend', type=strlist, help='Manage specified networks')
parser.add_argument('--no-network', metavar='NET', dest='nonets', default=[], action='extend', type=strlist, help='Skip specified networks')
a = parser.parse_args()
conn = libvirt.open(a.uri)
nets = {}
netdoms = set()
netxml = {}
subnets = {}
mac2net = {}
allocated = {}
for n in conn.listAllNetworks(libvirt.VIR_CONNECT_LIST_NETWORKS_PERSISTENT):
nn = n.name()
if nn in a.nonets or (a.nets and nn not in a.nets):
continue
xn = ET.fromstring(n.XMLDesc())
netxml[nn] = xn
if xn.find('domain[@name]') is None:
continue
netdoms.add(xn.find('domain').get('name'))
subnet = None
for ip in xn.findall('ip'):
if ip.get('family', 'ipv4') != 'ipv4':
continue
if ip.find('dhcp/range') is None:
continue
subnet = ip
break
if subnet:
subnets[nn] = subnet
allocated[nn] = set()
if not a.reset:
for h in subnet.findall('dhcp/host'):
mac2net[h.get('mac').lower()] = n
allocated[nn].add(IPv4Address(h.get('ip')))
nets[nn] = n
doms = []
domxml = {}
mac2dom = {}
for d in conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_PERSISTENT):
found = False
xd = ET.fromstring(d.XMLDesc())
domxml[d.name()] = xd
for intf in xd.findall('devices/interface[@type="network"]'):
if intf.find('source').get('network') not in subnets:
continue
mac2dom[intf.find('mac').get('address').lower()] = d
found = True
if found:
doms.append(d)
# clear old net host ent
def update_net(net, *a):
try:
net.update(*a, libvirt.VIR_NETWORK_UPDATE_AFFECT_LIVE | libvirt.VIR_NETWORK_UPDATE_AFFECT_CONFIG)
except libvirt.libvirtError:
net.update(*a, libvirt.VIR_NETWORK_UPDATE_AFFECT_CONFIG)
for n in nets.values():
dhcp = subnets[n.name()].find('dhcp')
rl = []
for h in dhcp.findall('host'):
if a.reset or h.get('mac').lower() not in mac2dom:
rl.append(h)
for h in rl:
print(f'Removing unused host entry ({h.get("ip")}, {h.get("mac")}, "{h.get("name")}") from {n.name()}')
dhcp.remove(h)
if not a.reset:
allocated[n.name()].remove(IPv4Address(h.get('ip')))
if not a.dry:
update_net(n,
libvirt.VIR_NETWORK_UPDATE_COMMAND_DELETE, libvirt.VIR_NETWORK_SECTION_IP_DHCP_HOST, -1,
ET.tostring(h, encoding='unicode'))
# assign ips
def update_domdev(dom, *a):
try:
dom.updateDeviceFlags(*a, libvirt.VIR_DOMAIN_DEVICE_MODIFY_LIVE | libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG)
except libvirt.libvirtError:
dom.updateDeviceFlags(*a, libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG)
for d in doms:
xd = domxml[d.name()]
for intf in xd.findall('devices/interface[@type="network"]'):
netname = intf.find('source').get('network')
if netname not in subnets:
continue
subnet = subnets[netname]
mac = intf.find('mac').get('address').lower()
ip = None
if mac in mac2net:
h = subnet.find(f'dhcp/host[@mac="{mac}"]')
ip = IPv4Address(h.get('ip'))
h.set('name', d.name())
else:
ip = IPv4Address(subnet.get('address')) + 1
high = IPv4Address(subnet.find('dhcp/range').get('end'))
while ip <= high:
if ip not in allocated[netname]:
break
ip += 1
allocated[netname].add(ip)
h = ET.Element('host')
h.set('mac', mac)
h.set('name', d.name())
h.set('ip', ip.compressed)
subnet.find('dhcp').append(h)
n = nets[netname]
print(f'Adding host entry ({h.get("ip")}, {h.get("mac")}, "{h.get("name")}") to {n.name()}')
if not a.dry:
update_net(n,
libvirt.VIR_NETWORK_UPDATE_COMMAND_ADD_LAST, libvirt.VIR_NETWORK_SECTION_IP_DHCP_HOST, -1,
ET.tostring(h, encoding='unicode'))
oldf = list(intf.findall('filterref[@filter="clean-traffic"]'))
filtparm = (
('CTRL_IP_LEARNING', 'none'),
('IP', ip.compressed))
if len(oldf) == 1:
f = oldf[0]
if len(f.findall('parameter')) == len(filtparm):
nomod = True
for n, v in filtparm:
p = f.find(f'parameter[@name="{n}"]')
if p is None or p.get('value') != v:
nomod = False
break
if nomod:
continue
for f in oldf:
intf.remove(f)
f = ET.Element('filterref')
f.set('filter', 'clean-traffic')
for n, v in filtparm:
p = ET.Element('parameter')
p.set('name', n)
p.set('value', v)
f.append(p)
intf.append(f)
print(f'Configuring network filter of "{d.name()}" ({mac})')
if not a.dry:
update_domdev(d, ET.tostring(intf, encoding='unicode'))
# modify hosts
if a.hosts:
c_begin = f'#BEGIN MANAGED HOSTS BY {PROGNAME}'
c_end = f'#END MANAGED HOSTS BY {PROGNAME}'
print('Updating hosts')
f = None
if a.dry:
class DryWrite:
def __init__(self):
self.buf = ''
def __enter__(self, *a):
return self
def __exit__(self, *a):
self.line(self.buf.strip())
def write(self, a):
self.buf += a
s = self.buf.split('\n', 1)
if len(s) >= 2:
self.line(s[0])
self.buf = s[1]
def line(self, a):
print(f'Written {a.strip()}')
f = DryWrite()
else:
f = NamedTemporaryFile('w', delete=False, dir=os.path.dirname(a.hosts), prefix=f'new-{os.path.basename(a.hosts)}-')
delete_f = True
try:
aliases = {}
try:
with open(a.hosts) as rf:
managed = False
for l in rf.readlines():
l = l.strip()
if l.startswith(c_begin):
managed = True
elif l.startswith(c_end):
managed = False
elif not managed:
f.write(l)
f.write('\n')
else:
l = l.split()[1:]
extra = set()
for n in l:
tld = n.rsplit('.', 1)[-1]
if tld not in netdoms:
extra.add(n)
else:
aliases[n] = extra
except FileNotFoundError:
pass
f.write(c_begin)
f.write('\n')
for n in nets.values():
dhcp = subnets[n.name()].find('dhcp')
dom = netxml[n.name()].find('domain').get('name')
for h in dhcp.findall('host'):
hn = h.get('name') + '.' + dom
nl = [hn]
if hn in aliases:
nl.extend(sorted(aliases[hn]))
f.write(h.get('ip'))
f.write(' ')
f.write(' '.join(nl))
f.write('\n')
f.write(c_end)
f.write('\n')
if not a.dry:
os.fchmod(f.fileno(), 0o644)
os.rename(f.name, a.hosts)
delete_f = False
finally:
if a.dry:
print(f'Would have replaced {a.hosts} with new hosts')
elif delete_f:
os.unlink(f.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment