Last active
February 17, 2024 09:25
-
-
Save tonusoo/6051bfbf0a0740dee62c4e0b0ed4e2ab to your computer and use it in GitHub Desktop.
netfilter mailinglist message: https://marc.info/?l=netfilter&m=170803541705534
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 -u | |
""" | |
Title : send-dst-unreach | |
Last modified date : 14.02.2024 | |
Author : Martin Tonusoo | |
Description : Reads packets from Netfilter queue, sends the | |
ICMPv6 type "Destination Unreachable" code | |
"Source address failed ingress/egress policy" | |
as a response to those packets and finally | |
drops the initial packet received from the | |
Netfilter queue. | |
Options : | |
Notes : Script is meant to be run as an "exec" type systemd | |
service. | |
Implements the RFC 7084(Basic Requirements | |
for IPv6 Customer Edge Routers) section 4.3 | |
L-14. | |
""" | |
import sys | |
import logging | |
from logging.handlers import SysLogHandler | |
from socket import if_indextoname | |
from netfilterqueue import NetfilterQueue | |
from scapy.sendrecv import sendp | |
from scapy.arch import get_if_hwaddr, get_if_addr6 | |
from scapy.layers.l2 import Ether | |
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach | |
# Script logs to syslog in order to preserve the journalctl | |
# capability to filter and color the messages based on the | |
# severity level. | |
syslog = SysLogHandler(address="/dev/log", facility=SysLogHandler.LOG_USER) | |
logging.basicConfig( | |
level=logging.DEBUG, format="%(levelname)s: %(message)s", handlers=[syslog] | |
) | |
def get_local_ipv6_addr(iface): | |
"""Finds an IPv6 address on interface.""" | |
# Return a global unicast address. | |
gua = get_if_addr6(iface) | |
if gua: | |
return gua | |
# Return a link-local address if no GUA addresses were found. | |
try: | |
with open("/proc/net/if_inet6", "r", encoding="ascii") as f: | |
for line in f: | |
parts = line.split() | |
# Last field is an interface name. | |
if parts[-1] == iface: | |
# First field is the IPv6 address. | |
if parts[0].startswith("fe80"): | |
lladdr = parts[0] | |
lladdr = [ | |
lladdr[i : i + 4] for i in range(0, len(lladdr), 4) | |
] | |
return ":".join(lladdr) | |
except OSError as err: | |
logging.warning(f"Error reading /proc/net/if_inet6: {err!r}") | |
# Return the unspecified address if link-local address was not found. | |
return "::" | |
def get_mtu(iface): | |
"""Finds the MTU of the interface.""" | |
try: | |
with open(f"/sys/class/net/{iface}/mtu", "r", encoding="ascii") as f: | |
return int(f.read().strip()) | |
except OSError as err: | |
logging.error(f"Error reading /sys/class/net/{iface}/mtu: {err!r}") | |
return None | |
def send_dst_unreach(pkt): | |
"""Sends the ICMPv6 "Destination Unreachable" to packet source. | |
Sends the ICMPv6 type 1("Destination Unreachable") | |
code 5("Source address failed ingress/egress policy") | |
message as a response to invoking packet after which | |
the invoking packet itself is dropped. | |
""" | |
try: | |
iface = if_indextoname(pkt.indev) | |
except OSError as err: | |
logging.error( | |
f'Interface name for ifindex "{pkt.indev}" not found: {err!r}' | |
) | |
# No point to continue. | |
sys.exit(1) | |
# Alternatively, one could process the bytestring directly. For example: | |
# | |
# ":".join([f"{byte:02x}" for byte in pkt.get_hw()[:6]]) | |
dst_mac_addr = Ether(pkt.get_hw()[:6]).dst | |
# For an interface with no MAC address, a 00:00:00:00:00:00 | |
# is returned. | |
src_mac_addr = get_if_hwaddr(iface) | |
src_ipv6_addr = get_local_ipv6_addr(iface) | |
payload = pkt.get_payload() | |
# Alternatively, one could process the "payload[8:24].hex()" string. | |
dst_ipv6_addr = IPv6(payload).src | |
# According to RFC 4443 section 3.1 the Destination Unreachable | |
# message should contain as much of invoking packet as possible | |
# without the ICMPv6 packet exceeding the minimum IPv6 MTU. | |
iface_mtu = get_mtu(iface) | |
if not iface_mtu: | |
# If for whatever reason the MTU was not found, then | |
# the payload will simply contain no invoking packet data. | |
iface_mtu = 0 | |
payload_len = min(iface_mtu, 1280 - len(IPv6() / ICMPv6DestUnreach())) | |
logging.debug( | |
f'Sending ICMPv6 "Destination Unreachable" from {src_ipv6_addr} ' | |
f"(MAC: {src_mac_addr}) to {dst_ipv6_addr} (MAC: {dst_mac_addr}) " | |
f"via {iface}" | |
) | |
dest_unreach = ( | |
Ether( | |
dst=dst_mac_addr, | |
src=src_mac_addr, | |
) | |
/ IPv6( | |
src=src_ipv6_addr, | |
dst=dst_ipv6_addr, | |
) | |
/ ICMPv6DestUnreach( | |
# 5 - Source address failed ingress/egress policy | |
code=5, | |
) | |
/ payload[:payload_len] | |
) | |
sendp(dest_unreach, iface=iface, verbose=0) | |
pkt.drop() | |
nfqueue = NetfilterQueue() | |
# Bind to Netfilter queue number 10. | |
# If incoming packet rate is higher than the rate which the program is | |
# able to reply and the queue becomes full(1024 packets; details can be seen | |
# with "cat /proc/net/netfilter/nfnetlink_queue"), then the new packets | |
# will be dropped. | |
nfqueue.bind(10, send_dst_unreach) | |
try: | |
nfqueue.run() | |
except KeyboardInterrupt: | |
pass | |
nfqueue.unbind() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment