Skip to content

Instantly share code, notes, and snippets.

@desbma
Last active August 26, 2015 15:06
Show Gist options
  • Save desbma/f68e118d5fce578f3159 to your computer and use it in GitHub Desktop.
Save desbma/f68e118d5fce578f3159 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import threading
import unittest
import scapy.all
REMOTE_MAC = "xx:xx:xx:xx:xx:xx"
REMOTE_IP = "2015::2"
LOCAL_ITF = "eth5"
LOCAL_IP = "2015::1"
class SniffThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.received = []
super().__init__()
def run(self):
self.received.extend(scapy.all.sniff(*self.args, **self.kwargs))
class TestPingScapySrpIssue(unittest.TestCase):
def test_captureWithSniff(self):
for ping_data_size in (0, 8):
with self.subTest(ping_data_size=ping_data_size):
# build eth frame
eth = scapy.all.Ether(src=scapy.all.get_if_hwaddr(LOCAL_ITF),
dst=REMOTE_MAC)
ipv6 = scapy.all.IPv6(src=LOCAL_IP,
dst=REMOTE_IP)
icmp = scapy.all.ICMPv6EchoRequest()
ping_data = os.urandom(ping_data_size)
frame = eth / ipv6 / icmp / ping_data
# send it and get response
sniff_thread = SniffThread(timeout=1,
iface=LOCAL_ITF,
lfilter=lambda x: x.haslayer("ICMPv6EchoReply"))
sniff_thread.start()
scapy.all.sendp(frame, iface=LOCAL_ITF, verbose=False)
sniff_thread.join()
# build expected response
expected_eth = scapy.all.Ether(src=REMOTE_MAC,
dst=scapy.all.get_if_hwaddr(LOCAL_ITF))
expected_ipv6 = scapy.all.IPv6(src=REMOTE_IP,
dst=LOCAL_IP)
expected_icmp = scapy.all.ICMPv6EchoReply()
expected_frame = expected_eth / expected_ipv6 / expected_icmp / ping_data
# check response
self.assertEqual(len(sniff_thread.received), 1)
print("reply received", scapy.all.hexstr(bytes(sniff_thread.received[0])))
print("reply expected", scapy.all.hexstr(bytes(expected_frame)))
self.assertEqual(bytes(sniff_thread.received[0]), bytes(expected_frame))
def test_captureWithSrp(self):
for ping_data_size in (0, 8):
with self.subTest(ping_data_size=ping_data_size):
# build eth frame
eth = scapy.all.Ether(src=scapy.all.get_if_hwaddr(LOCAL_ITF),
dst=REMOTE_MAC)
ipv6 = scapy.all.IPv6(src=LOCAL_IP,
dst=REMOTE_IP)
icmp = scapy.all.ICMPv6EchoRequest()
ping_data = os.urandom(ping_data_size)
frame = eth / ipv6 / icmp / ping_data
# send it and get response
answered, unanswered = scapy.all.srp(frame,
iface=LOCAL_ITF,
timeout=1,
verbose=False)
# build expected response
expected_eth = scapy.all.Ether(src=REMOTE_MAC,
dst=scapy.all.get_if_hwaddr(LOCAL_ITF))
expected_ipv6 = scapy.all.IPv6(src=REMOTE_IP,
dst=LOCAL_IP)
expected_icmp = scapy.all.ICMPv6EchoReply()
expected_frame = expected_eth / expected_ipv6 / expected_icmp / ping_data
# check response
self.assertEqual(len(answered), 1) # fails here if ping_data_size != 0
self.assertEqual(bytes(answered[0][1]), bytes(expected_frame))
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment