Created
April 9, 2020 22:45
-
-
Save leandrolanzieri/b3ad9c2b204b813103d0ace88b5afde9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# vim:fenc=utf-8 | |
# | |
# Copyright © 2018 Martine Lenders <m.lenders@fu-berlin.de> | |
# | |
# Distributed under terms of the MIT license. | |
import argparse | |
import queue | |
import time | |
import threading | |
from scapy.layers.l2 import Ether | |
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply | |
from scapy.sendrecv import sniff, sendp | |
ETHERNET_MTU = 1500 | |
STOP_ID = 0x1234 | |
TEST_ID = 0x1 | |
packet_queue = queue.Queue() | |
def sniff_for_echo_reply(iface): | |
def stop_filter(p): | |
return p.haslayer(ICMPv6EchoReply) and \ | |
(p[ICMPv6EchoReply].id == STOP_ID) | |
ps = sniff(stop_filter=stop_filter, iface=iface) | |
packet_queue.put(ps) | |
packet_queue.task_done() | |
def max_enc(): | |
""" | |
Calculates the maximum number of IPv6 headers that can be encapsulated | |
within each other without exceeding the MTU of Ethernet. | |
""" | |
i = 1 | |
IP_HDR_LEN = len(IPv6()) | |
while ((i * IP_HDR_LEN) + 8) < ETHERNET_MTU: | |
i += 1 | |
return i - 1 | |
def send_enc_ipv6(iface, dst_ipv6, src_ipv6, dst_ether, src_ether=None): | |
max_pkt = max_enc() | |
ip = IPv6(src=src_ipv6, dst=dst_ipv6) | |
sniffer = threading.Thread(target=sniff_for_echo_reply, args=(iface,)) | |
sniffer.start() | |
time.sleep(0.1) # wait a bit for sniffer to start | |
for i in range(max_pkt): | |
ips = ip | |
for _ in range(i): | |
ips /= ip | |
sendp(Ether(dst=dst_ether, src=src_ether) / ips / | |
ICMPv6EchoRequest(id=TEST_ID, seq=i), iface=iface, verbose=0) | |
time.sleep(0.001) | |
# send stop condition for sniffer | |
sendp(Ether(dst=dst_ether, src=src_ether) / ip / | |
ICMPv6EchoRequest(id=STOP_ID), iface=iface, verbose=0) | |
# wait for sniffer results | |
ps = packet_queue.get() | |
packet_queue.join() | |
sniffer.join() | |
requests = [p for p in ps if | |
p.haslayer(ICMPv6EchoRequest) and | |
(p[ICMPv6EchoRequest].id == TEST_ID)] | |
replies = [p for p in ps if | |
p.haslayer(ICMPv6EchoReply) and | |
(p[ICMPv6EchoReply].id == TEST_ID)] | |
assert(len(replies) <= len(requests)) | |
assert((len(replies) / len(requests)) > .96) | |
reply_seqs = [rep[ICMPv6EchoReply].seq for rep in replies] | |
reply_seq_occurences = [reply_seqs.count(seq) for seq in reply_seqs] | |
# no sequence number occurs more than once in the replies | |
assert(not any(occ > 1 for occ in reply_seq_occurences)) | |
if __name__ == "__main__": | |
p = argparse.ArgumentParser() | |
p.add_argument("iface") | |
p.add_argument("dst_ipv6") | |
p.add_argument("src_ipv6") | |
p.add_argument("dst_ether") | |
p.add_argument("src_ether", default=None, nargs="?") | |
args = p.parse_args() | |
send_enc_ipv6(**vars(args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment