Skip to content

Instantly share code, notes, and snippets.

@speakinghedge
Created March 12, 2018 20:19
Show Gist options
  • Save speakinghedge/9f98f402808680b15215e404ef5f7ea8 to your computer and use it in GitHub Desktop.
Save speakinghedge/9f98f402808680b15215e404ef5f7ea8 to your computer and use it in GitHub Desktop.
use socketpair to test scapy sniff()
from scapy.all import *
class FooSocket(Thread, SuperSocket):
def __init__(self, frame_defs):
super(FooSocket, self).__init__()
self._frame_defs = frame_defs
self.ins = None
def __enter__(self):
self._s0, self._s1 = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
self.ins = self._s1
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._s0.close()
self._s0 = None
self._s1.close()
self._s1 = None
def run(self):
for frame_def in self._frame_defs:
frame, ifg = frame_def
try:
self._s0.send(frame)
except (socket.error, AttributeError):
break
time.sleep(ifg)
def fileno(self):
return self._s1.fileno()
def _dump(p):
p.show()
test_frame_set = [
((Ether() / IP() / TCP() / Raw(b'bar_0')).do_build(), 0.02),
((Ether() / IP() / TCP() / Raw(b'bar_1')).do_build(), 0.02),
((Ether() / IP() / TCP() / Raw(b'bar_2')).do_build(), 0.02),
((Ether() / IP() / TCP() / Raw(b'bar_3')).do_build(), 0.08),
((Ether() / IP() / TCP() / Raw(b'bar_4')).do_build(), 0.02)
]
with FooSocket(test_frame_set) as fs:
sniff(opened_socket=fs, timeout=1, interpkt_timeout=0.06, prn=_dump)
@alexSmaet
Copy link

i'm beginner in scapy, how we can using opened_socket I have little bit confused

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment