Skip to content

Instantly share code, notes, and snippets.

@tonusoo
Last active February 17, 2024 09:25
Show Gist options
  • Save tonusoo/6051bfbf0a0740dee62c4e0b0ed4e2ab to your computer and use it in GitHub Desktop.
Save tonusoo/6051bfbf0a0740dee62c4e0b0ed4e2ab to your computer and use it in GitHub Desktop.
netfilter mailinglist message: https://marc.info/?l=netfilter&m=170803541705534
#!/usr/bin/python3 -u
"""
Title : send-dst-unreach
Last modified date : 14.02.2024
Author : Martin Tonusoo
Description : Reads packets from Netfilter queue, sends the
ICMPv6 type "Destination Unreachable" code
"Source address failed ingress/egress policy"
as a response to those packets and finally
drops the initial packet received from the
Netfilter queue.
Options :
Notes : Script is meant to be run as an "exec" type systemd
service.
Implements the RFC 7084(Basic Requirements
for IPv6 Customer Edge Routers) section 4.3
L-14.
"""
import sys
import logging
from logging.handlers import SysLogHandler
from socket import if_indextoname
from netfilterqueue import NetfilterQueue
from scapy.sendrecv import sendp
from scapy.arch import get_if_hwaddr, get_if_addr6
from scapy.layers.l2 import Ether
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
# Script logs to syslog in order to preserve the journalctl
# capability to filter and color the messages based on the
# severity level.
syslog = SysLogHandler(address="/dev/log", facility=SysLogHandler.LOG_USER)
logging.basicConfig(
level=logging.DEBUG, format="%(levelname)s: %(message)s", handlers=[syslog]
)
def get_local_ipv6_addr(iface):
"""Finds an IPv6 address on interface."""
# Return a global unicast address.
gua = get_if_addr6(iface)
if gua:
return gua
# Return a link-local address if no GUA addresses were found.
try:
with open("/proc/net/if_inet6", "r", encoding="ascii") as f:
for line in f:
parts = line.split()
# Last field is an interface name.
if parts[-1] == iface:
# First field is the IPv6 address.
if parts[0].startswith("fe80"):
lladdr = parts[0]
lladdr = [
lladdr[i : i + 4] for i in range(0, len(lladdr), 4)
]
return ":".join(lladdr)
except OSError as err:
logging.warning(f"Error reading /proc/net/if_inet6: {err!r}")
# Return the unspecified address if link-local address was not found.
return "::"
def get_mtu(iface):
"""Finds the MTU of the interface."""
try:
with open(f"/sys/class/net/{iface}/mtu", "r", encoding="ascii") as f:
return int(f.read().strip())
except OSError as err:
logging.error(f"Error reading /sys/class/net/{iface}/mtu: {err!r}")
return None
def send_dst_unreach(pkt):
"""Sends the ICMPv6 "Destination Unreachable" to packet source.
Sends the ICMPv6 type 1("Destination Unreachable")
code 5("Source address failed ingress/egress policy")
message as a response to invoking packet after which
the invoking packet itself is dropped.
"""
try:
iface = if_indextoname(pkt.indev)
except OSError as err:
logging.error(
f'Interface name for ifindex "{pkt.indev}" not found: {err!r}'
)
# No point to continue.
sys.exit(1)
# Alternatively, one could process the bytestring directly. For example:
#
# ":".join([f"{byte:02x}" for byte in pkt.get_hw()[:6]])
dst_mac_addr = Ether(pkt.get_hw()[:6]).dst
# For an interface with no MAC address, a 00:00:00:00:00:00
# is returned.
src_mac_addr = get_if_hwaddr(iface)
src_ipv6_addr = get_local_ipv6_addr(iface)
payload = pkt.get_payload()
# Alternatively, one could process the "payload[8:24].hex()" string.
dst_ipv6_addr = IPv6(payload).src
# According to RFC 4443 section 3.1 the Destination Unreachable
# message should contain as much of invoking packet as possible
# without the ICMPv6 packet exceeding the minimum IPv6 MTU.
iface_mtu = get_mtu(iface)
if not iface_mtu:
# If for whatever reason the MTU was not found, then
# the payload will simply contain no invoking packet data.
iface_mtu = 0
payload_len = min(iface_mtu, 1280 - len(IPv6() / ICMPv6DestUnreach()))
logging.debug(
f'Sending ICMPv6 "Destination Unreachable" from {src_ipv6_addr} '
f"(MAC: {src_mac_addr}) to {dst_ipv6_addr} (MAC: {dst_mac_addr}) "
f"via {iface}"
)
dest_unreach = (
Ether(
dst=dst_mac_addr,
src=src_mac_addr,
)
/ IPv6(
src=src_ipv6_addr,
dst=dst_ipv6_addr,
)
/ ICMPv6DestUnreach(
# 5 - Source address failed ingress/egress policy
code=5,
)
/ payload[:payload_len]
)
sendp(dest_unreach, iface=iface, verbose=0)
pkt.drop()
nfqueue = NetfilterQueue()
# Bind to Netfilter queue number 10.
# If incoming packet rate is higher than the rate which the program is
# able to reply and the queue becomes full(1024 packets; details can be seen
# with "cat /proc/net/netfilter/nfnetlink_queue"), then the new packets
# will be dropped.
nfqueue.bind(10, send_dst_unreach)
try:
nfqueue.run()
except KeyboardInterrupt:
pass
nfqueue.unbind()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment