Skip to content

Instantly share code, notes, and snippets.

@berdario
Created April 21, 2015 16:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save berdario/cdc42b46f4023357c817 to your computer and use it in GitHub Desktop.
Save berdario/cdc42b46f4023357c817 to your computer and use it in GitHub Desktop.
import sys
import struct
from collections import namedtuple
from mmap import mmap, PROT_READ
Header = namedtuple('Header', 'magic_number version_major version_minor thiszone sigfigs snaplen network')
RecordHeader = namedtuple('RecordHeader', 'timestamp timestamp_us size original_size')
def get_parser(record_type, struct_fmt, bigendian=False):
struct_fmt = ('>' if bigendian else '<') + struct_fmt
return lambda x: record_type(*struct.unpack(struct_fmt, x))
def detect_endianness(four_bytes):
(magicnum, ) = struct.unpack('<i', four_bytes)
if magicnum == -1582119980:
return False
# little endian
elif magicnum == -725372255:
return True
# big endian
else:
raise ValueError('not a pcap file')
def parse_record(mm, header_parser):
header = header_parser(mm.read(16))
packet = mm.read(header.size)
return header, packet
def get_packets(mm, header_parser):
filesize = mm.size()
while mm.tell() != filesize:
try:
header, _ = parse_record(mm, header_parser)
yield header.size
except struct.error:
print('malformed packet', file=sys.stderr)
yield None
def main(fname):
with open(fname, 'rb') as fd, mmap(fd.fileno(), 0, prot=PROT_READ) as mm:
endianness = detect_endianness(mm.read(4))
mm.seek(0)
parse_header = get_parser(Header, 'ihhiiii', endianness)
parse_record_header = get_parser(RecordHeader, 'iiii', endianness)
parse_header(mm.read(24))
for p in get_packets(mm, parse_record_header):
print(p)
if __name__ == '__main__':
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment