Skip to content

Instantly share code, notes, and snippets.

@zajdee
Last active November 4, 2023 09:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zajdee/8424ce31bc6addae2316ecba64c2960d to your computer and use it in GitHub Desktop.
Save zajdee/8424ce31bc6addae2316ecba64c2960d to your computer and use it in GitHub Desktop.
A simple Python daemon waiting for pref64
#!/usr/bin/env python3
"""Module to assist with capturing pref64 from RAs on the IPv6-mostly networks and acting upon it."""
import argparse
import ipaddress
import logging
import re
import signal
import struct
import socket
import subprocess
from datetime import datetime, timedelta
from pprint import pprint
from scapy.layers.inet6 import ICMPv6NDOptPrefixInfo, ICMPv6ND_RA, ICMPv6NDOptPREF64
"""
You need the latest scapy. The following commands should help you to get this script up and running in Ubuntu 22.04.
apt install python3-pip
pip3 install argparse
pip3 uninstall scapy
pip3 -vv install git+https://github.com/secdev/scapy.git@0474c37bf1d147c969173d52ab3ac76d2404d981
python3 pref64_ra_daemon.py
Next steps are probably to replicate clatd from Tore Anderson (https://github.com/toreanderson/clatd), e.g.:
1. Discover the prefix (already done from the RAs)
2. Discover the device facing the PLAT (ip --json -6 route get fd00:64:: | jq)
[
{
"dst": "fd00:64::",
"from": "::",
"gateway": "fe80::e859:9eff:fe11:2dbd",
"dev": "enp1s0",
"protocol": "ra",
"prefsrc": "2a03:c22:fe3b:64:b2a:1c35:c99f:2c2c", <--- tohle se může změnit, pokud se změní preferovaná lokální adresa
"metric": 100,
"flags": [],
"pref": "high"
}
]
3. Derive a CLAT IPv6 address from the IPv6 address on the discovered interface
4. Check if IPv4 connectivity is present - if not, proceed
5. Enable IPv6 forwarding
6. Enable Proxy-ND
7. Create the `clat` tun device
8. Add a default route via the `clat` device
9. Prepare a Tayga config and Start it up
# cat /tmp/D8W3twOorY
tun-device clat
prefix 64:ff9b::/96
ipv4-addr 192.0.0.2
map 192.0.0.1 2001:db8:0:64:40da:c593:6378:0
10. Periodically verify if the pref64 is still valid
10.1. if not anymore (e.g. it has expired), tear down Tayga and remove the default IPv4 route
10.2. if the prefix has changed, tear down and start Tayga again
11. If the process is about to shut down, tear Tayga down too
TBD: How to listen on all interfaces that are up? (Do we even want this? Or do we want to listen just on the interfaces with an IPv6 connectivity?)
"""
IP_ADDR_MATCHER = re.compile(
r"inet6 (?P<address>[0-9a-f:]+)/(?P<netmask>[0-9]{1,3}) .*? "
r"preferred_lft (?P<preferred>(forever|\d+sec))",
re.DOTALL | re.MULTILINE | re.I,
)
IPV6_ALLNODES_MCAST = "ff02::1"
ICMPV6_TYPE_RA = 134
class Terminating(Exception):
"""Helper class to terminate the application correctly."""
class StaticMAddrDaemon:
"""Main class of the tool."""
iface = None
def __init__(self):
signal.signal(signal.SIGINT, self.sigterm)
signal.signal(signal.SIGTERM, self.sigterm)
self.logging_init()
def act_on_ra(self, packet):
"""Act upon RA reception."""
logging.debug("=== captured packet START ===")
icmpv6 = packet[ICMPv6ND_RA]
prefixes = []
for payload in icmpv6.iterpayloads():
# only process the ICMPv6NDOptPrefixInfo payload
if not isinstance(payload, ICMPv6NDOptPREF64):
continue
prefix = self.process_pref64(payload[ICMPv6NDOptPREF64])
print(f"Captured a pref64: {prefix}")
prefixes.append(prefix)
continue
if len(prefixes) > 0:
print(f"Captured pref64s: {prefixes}")
logging.debug("=== captured packet END ===")
def process_pref64(self, payload):
"""Decode the captured pref64 data."""
lifetime = payload.scaledlifetime * 8
netmask = ICMPv6NDOptPREF64.plc.i2s[payload.plc]
expiry = datetime.now() + timedelta(seconds=lifetime)
return {
"expiry": expiry,
"pref64": f"{payload.prefix}{netmask}",
}
def decode_ra(self, data):
"""Decode the capatured RA."""
if len(data) < 1:
return None
if data[0] != ICMPV6_TYPE_RA:
# Not the RA we're looking for
return None
try:
advertisement = ICMPv6ND_RA(data)
except Exception:
advertisement = None
return advertisement
def receiver(self):
"""Multicast packet receiver function."""
logging.debug("start for iface %s", self.iface)
# Look up multicast group address in name server and find out IP version
addrinfo = socket.getaddrinfo(IPV6_ALLNODES_MCAST, None)[0]
# Create a socket
mcast_socket = socket.socket(
addrinfo[0], socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp")
)
mcast_socket.setsockopt(
socket.SOL_SOCKET, 25, str(self.iface + "\0").encode("utf-8")
)
# Allow multiple copies of this program on one machine
mcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
# Join group
mreq = group_bin + struct.pack("@I", 0)
mcast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
# Loop, processing data we receive
try:
while True:
data, _ = mcast_socket.recvfrom(1500)
advertisement = self.decode_ra(data)
if not advertisement:
continue
self.act_on_ra(advertisement)
except Terminating:
logging.warning("Terminated, exitting gracefully...")
def launch_ip(self, iface):
"""Launches the `ip` command"""
cmd = f"ip -6 address show dev {iface} scope global"
result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, check=False)
return result.stdout.decode("utf-8")
def logging_init(self):
"""Initialize logging."""
logging.basicConfig(
level=logging.DEBUG,
datefmt="%Y-%m-%d %H:%M:%S",
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - "
"%(funcName)s: %(message)s",
)
def parse_args(self):
"""Parse arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--interface", type=str, required=True)
args = parser.parse_args()
self.iface = args.interface
return args
@staticmethod
def sigterm(signal_number, stack_frame):
"""Terminate gracefully on a signal."""
raise Terminating()
def main():
"""Main function. What else?"""
daemon = StaticMAddrDaemon()
daemon.parse_args()
daemon.receiver()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment