Skip to content

Instantly share code, notes, and snippets.

@joviniko
Created August 13, 2019 15:40
Show Gist options
  • Save joviniko/91d72028a3ab2a754a14c23c3a5c3a39 to your computer and use it in GitHub Desktop.
Save joviniko/91d72028a3ab2a754a14c23c3a5c3a39 to your computer and use it in GitHub Desktop.
Export every n-th packet to a new pcap file. Similar to sFlow https://en.wikipedia.org/wiki/SFlow
import sys
import struct
if len(sys.argv) != 4 or not str.isdigit(sys.argv[3]):
print("Usage: python " + sys.argv[0] + " input.pcap output.pcap 100")
sys.exit("Arguments error. Expected: input.pcap output.pcap packet-modulo-integer")
"""
pcap file format library
2010-01 ZHAW, Martin Renold
2010-02 ZHAW, David Gunzinger
public domain
"""
class PcapFile:
def __init__(self, filename, mode='r', nanoseconds=False):
self.mode = mode
if mode == 'r':
self.f = open(filename,'rb')
self.__read_header()
elif mode == 'w':
self.nanoseconds = nanoseconds
self.f = open(filename, 'wb')
self.__write_header()
else:
raise NotImplementedError('mode not supported, must be r or w')
self.pktnr = 1
def __write_header(self):
if self.mode != 'w':
raise NotImplementedError('can only write header in w mode')
if self.nanoseconds:
magic = 0xa1b23c4d
else:
magic = 0xa1b2c3d4
data = struct.pack('IHHiIII',
magic, # guint32 magic_number; /* magic number */
2, # guint16 version_major; /* major version number */
4, # guint16 version_minor; /* minor version number */
0, # gint32 thiszone; /* GMT to local correction */
0, # guint32 sigfigs; /* accuracy of timestamps */
65535, # guint32 snaplen; /* max length of captured packets, in octets */
1, # guint32 network; /* data link type */
)
self.f.write(data)
def __read_header(self):
if self.mode != 'r':
raise NotImplementedError('can only read header in r mode')
header = self.f.read(24)
header_unpack=struct.unpack('IHHiIII',header)
#self.header = header_unpack
if (header_unpack[0] == 0xa1b23c4d): # PCAP_NSEC_MAGIC
self.nanoseconds = True
elif(header_unpack[0] == 0xa1b2c3d4):
self.nanoseconds = False
else:
raise NotImplementedError("header magic %x not yet implemented"%(header_unpack[0],))
def get_mode(self):
"""
returns the mode of the class: r for reading, w for writing
"""
return self.mode
def get_nanoseconds(self):
"""
returns if ts_subsec is nanoseconds or seconds
"""
return self.nanoseconds
def get_packetnr(self):
"""
returns the packetnr
"""
return self.pktnr
def write_frame(self, data, ts_sec=0, ts_subsec=0):
"""
ts_subsec is in usec by default. If the file was opened with nanoseconds=True, it is in nanoseconds.
"""
data = struct.pack('IIII',
ts_sec, # guint32 ts_sec; /* timestamp seconds */
ts_subsec, # guint32 ts_usec; /* timestamp <s>microseconds</s>sub seconds */
len(data), # guint32 incl_len; /* number of octets of packet saved in file */
len(data), # guint32 orig_len; /* actual length of packet */
) + data
self.f.write(data)
self.pktnr+=1
def read_frame(self):
"""
ts_subsec is in usec by default. If the file was opened with nanoseconds=True, it is in nanoseconds.
"""
header = self.f.read(16)
if(len(header) != 16):
raise EOFError()
header_unpack = struct.unpack("IIII",header)
hdr = {"ts_sec":header_unpack[0],
"ts_subsec":header_unpack[1],
"incl_len":header_unpack[2],
"orig_len":header_unpack[3],
"nr":self.pktnr}
self.pktnr+=1
data = self.f.read(hdr["incl_len"])
if(len(data) != hdr['incl_len']):
raise EOFError()
return (hdr,data)
def __iter__(self):
while 1:
try:
yield self.read_frame()
except EOFError:
return
def close(self):
self.f.close()
orig_file = PcapFile(sys.argv[1])
new_file = PcapFile(sys.argv[2], mode='w')
packet_modulo = int(sys.argv[3])
for packet in orig_file:
if packet[0]["nr"] % packet_modulo == 0:
new_file.write_frame(packet[1], ts_sec=packet[0]['ts_sec'], ts_subsec=packet[0]['ts_subsec'])
orig_file.close()
new_file.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment