Skip to content

Instantly share code, notes, and snippets.

@leandrolanzieri
Created April 9, 2020 22:45
Show Gist options
  • Save leandrolanzieri/b3ad9c2b204b813103d0ace88b5afde9 to your computer and use it in GitHub Desktop.
Save leandrolanzieri/b3ad9c2b204b813103d0ace88b5afde9 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2018 Martine Lenders <m.lenders@fu-berlin.de>
#
# Distributed under terms of the MIT license.
import argparse
import queue
import time
import threading
from scapy.layers.l2 import Ether
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.sendrecv import sniff, sendp
ETHERNET_MTU = 1500
STOP_ID = 0x1234
TEST_ID = 0x1
packet_queue = queue.Queue()
def sniff_for_echo_reply(iface):
def stop_filter(p):
return p.haslayer(ICMPv6EchoReply) and \
(p[ICMPv6EchoReply].id == STOP_ID)
ps = sniff(stop_filter=stop_filter, iface=iface)
packet_queue.put(ps)
packet_queue.task_done()
def max_enc():
"""
Calculates the maximum number of IPv6 headers that can be encapsulated
within each other without exceeding the MTU of Ethernet.
"""
i = 1
IP_HDR_LEN = len(IPv6())
while ((i * IP_HDR_LEN) + 8) < ETHERNET_MTU:
i += 1
return i - 1
def send_enc_ipv6(iface, dst_ipv6, src_ipv6, dst_ether, src_ether=None):
max_pkt = max_enc()
ip = IPv6(src=src_ipv6, dst=dst_ipv6)
sniffer = threading.Thread(target=sniff_for_echo_reply, args=(iface,))
sniffer.start()
time.sleep(0.1) # wait a bit for sniffer to start
for i in range(max_pkt):
ips = ip
for _ in range(i):
ips /= ip
sendp(Ether(dst=dst_ether, src=src_ether) / ips /
ICMPv6EchoRequest(id=TEST_ID, seq=i), iface=iface, verbose=0)
time.sleep(0.001)
# send stop condition for sniffer
sendp(Ether(dst=dst_ether, src=src_ether) / ip /
ICMPv6EchoRequest(id=STOP_ID), iface=iface, verbose=0)
# wait for sniffer results
ps = packet_queue.get()
packet_queue.join()
sniffer.join()
requests = [p for p in ps if
p.haslayer(ICMPv6EchoRequest) and
(p[ICMPv6EchoRequest].id == TEST_ID)]
replies = [p for p in ps if
p.haslayer(ICMPv6EchoReply) and
(p[ICMPv6EchoReply].id == TEST_ID)]
assert(len(replies) <= len(requests))
assert((len(replies) / len(requests)) > .96)
reply_seqs = [rep[ICMPv6EchoReply].seq for rep in replies]
reply_seq_occurences = [reply_seqs.count(seq) for seq in reply_seqs]
# no sequence number occurs more than once in the replies
assert(not any(occ > 1 for occ in reply_seq_occurences))
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("iface")
p.add_argument("dst_ipv6")
p.add_argument("src_ipv6")
p.add_argument("dst_ether")
p.add_argument("src_ether", default=None, nargs="?")
args = p.parse_args()
send_enc_ipv6(**vars(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment