Created
March 12, 2018 20:19
-
-
Save speakinghedge/9f98f402808680b15215e404ef5f7ea8 to your computer and use it in GitHub Desktop.
use socketpair to test scapy sniff()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import * | |
class FooSocket(Thread, SuperSocket): | |
def __init__(self, frame_defs): | |
super(FooSocket, self).__init__() | |
self._frame_defs = frame_defs | |
self.ins = None | |
def __enter__(self): | |
self._s0, self._s1 = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) | |
self.ins = self._s1 | |
self.start() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self._s0.close() | |
self._s0 = None | |
self._s1.close() | |
self._s1 = None | |
def run(self): | |
for frame_def in self._frame_defs: | |
frame, ifg = frame_def | |
try: | |
self._s0.send(frame) | |
except (socket.error, AttributeError): | |
break | |
time.sleep(ifg) | |
def fileno(self): | |
return self._s1.fileno() | |
def _dump(p): | |
p.show() | |
test_frame_set = [ | |
((Ether() / IP() / TCP() / Raw(b'bar_0')).do_build(), 0.02), | |
((Ether() / IP() / TCP() / Raw(b'bar_1')).do_build(), 0.02), | |
((Ether() / IP() / TCP() / Raw(b'bar_2')).do_build(), 0.02), | |
((Ether() / IP() / TCP() / Raw(b'bar_3')).do_build(), 0.08), | |
((Ether() / IP() / TCP() / Raw(b'bar_4')).do_build(), 0.02) | |
] | |
with FooSocket(test_frame_set) as fs: | |
sniff(opened_socket=fs, timeout=1, interpkt_timeout=0.06, prn=_dump) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
i'm beginner in scapy, how we can using opened_socket I have little bit confused