Skip to content

Instantly share code, notes, and snippets.

@SkypLabs
Last active April 24, 2023 12:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save SkypLabs/06bd7f414f51d700e04be705cb32659d to your computer and use it in GitHub Desktop.
Save SkypLabs/06bd7f414f51d700e04be705cb32659d to your computer and use it in GitHub Desktop.
Multiple code examples used to demonstrate some issues and a solution to sniff network packets inside a thread using Scapy
from scapy.all import *
interface = "eth0"
def print_packet(packet):
ip_layer = packet.getlayer(IP)
print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
print("[*] Start sniffing...")
sniff(iface=interface, filter="ip", prn=print_packet)
print("[*] Stop sniffing")
from scapy.all import *
from threading import Thread
from time import sleep
class Sniffer(Thread):
def __init__(self, interface="eth0"):
super().__init__()
self.interface = interface
def run(self):
sniff(iface=self.interface, filter="ip", prn=self.print_packet)
def print_packet(self, packet):
ip_layer = packet.getlayer(IP)
print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
sniffer = Sniffer()
print("[*] Start sniffing...")
sniffer.start()
try:
while True:
sleep(100)
except KeyboardInterrupt:
print("[*] Stop sniffing")
sniffer.join()
from scapy.all import *
from threading import Thread, Event
from time import sleep
class Sniffer(Thread):
def __init__(self, interface="eth0"):
super().__init__()
self.interface = interface
self.stop_sniffer = Event()
def run(self):
sniff(iface=self.interface, filter="ip", prn=self.print_packet, stop_filter=self.should_stop_sniffer)
def join(self, timeout=None):
self.stop_sniffer.set()
super().join(timeout)
def should_stop_sniffer(self, packet):
return self.stop_sniffer.isSet()
def print_packet(self, packet):
ip_layer = packet.getlayer(IP)
print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
sniffer = Sniffer()
print("[*] Start sniffing...")
sniffer.start()
try:
while True:
sleep(100)
except KeyboardInterrupt:
print("[*] Stop sniffing")
sniffer.join()
from scapy.all import *
from threading import Thread, Event
from time import sleep
class Sniffer(Thread):
def __init__(self, interface="eth0"):
super().__init__()
self.daemon = True
self.socket = None
self.interface = interface
self.stop_sniffer = Event()
def run(self):
self.socket = conf.L2listen(
type=ETH_P_ALL,
iface=self.interface,
filter="ip"
)
sniff(
opened_socket=self.socket,
prn=self.print_packet,
stop_filter=self.should_stop_sniffer
)
def join(self, timeout=None):
self.stop_sniffer.set()
super().join(timeout)
def should_stop_sniffer(self, packet):
return self.stop_sniffer.isSet()
def print_packet(self, packet):
ip_layer = packet.getlayer(IP)
print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
sniffer = Sniffer()
print("[*] Start sniffing...")
sniffer.start()
try:
while True:
sleep(100)
except KeyboardInterrupt:
print("[*] Stop sniffing")
sniffer.join(2.0)
if sniffer.isAlive():
sniffer.socket.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment