Skip to content

Instantly share code, notes, and snippets.

@viz-prakash
Created January 27, 2020 02:55
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save viz-prakash/544ef1195b34ec84f4ff2d2b77b47d30 to your computer and use it in GitHub Desktop.
Save viz-prakash/544ef1195b34ec84f4ff2d2b77b47d30 to your computer and use it in GitHub Desktop.
Python script for Pcap parsing using Scapy, along with performance testing
#!/usr/bin/env python3
# -*- coding: ISO-8859-15 -*-
"""
This file contains some example methods of how pcaps can be parsed, filtered in different ways, and
converted to JSON representation with scapy and tshark(tshark is directly invoked on pcap).
It shows an example of how a tcp session can be extracted from a huge pcap consisting of multiple
sessions.
It also does a performance testing of those methods, but don't take the output as it is.
I did performance testing of each method which does everything from begining till the end, by that
I mean reading a pcap till the needed operation, like extract a tcp session or json representation
of TCP session. I did this becuase I wanted to test for the case where I have to do only one
operation on the pcap, but if need is to do multiple operations, in Scapy once pcap is
read, all the operation doesn't need to read the same file again, which means time incured by the
read will only affect the performance once. So, what's the conclusion of performance testing,
in my opinion if you have to multiple operations on a pcap go with the pure Scapy calls,
read it once, perform as many operations it's going to be faster than invoking tshark, but if
you need to perform only one operation on ia pcap, go with tshark process call, Scapy can't match it,
most expensive operation is reading the pcap.
Addition to all that, this script shows how one can do performance testing in python with timeit
module.
I am aware that I am being sloppy by not writing documentation for the methods but I just wanted to
learn about how to use Scapy quickly and what's the performance of different ways to do what I
wanted to do. I am also aware that hashes of extracted pcap sessions are not same, but I verfied the
content inside the pcap are exactly same, except for some reason Sacpy puts ethernet capture length
to 65535, where as tshark puts something different.
Feel free to use this script in any shape or form.
Cheers!
This script was tested with following configuration:
Scapy version: 2.4.3
Python version: 3.6.9
Operating System: macOS Mojave
You need tshark installed to invoke the tshark binary.
"""
import subprocess
import timeit
import argparse
import sys
from hashlib import sha256
from scapy.all import *
from collections import OrderedDict
def file_hash(file_path, hash_algo="sha256"):
buf_size = 65536
if hash_algo == "sha256":
digester = sha256()
with open(file_path, 'rb') as file_ref:
while True:
buf = file_ref.read(buf_size)
if not buf:
break
digester.update(buf)
return digester.hexdigest()
def filter_packets(pkts, src, dst, sp, dp, proto=TCP):
filtered = [pkt for pkt in pkts if TCP in pkt and ((
pkt[IP].src == src and pkt[IP].dst == dst and pkt[TCP].sport == sp and
pkt[TCP].dport == dp) or (
pkt[IP].src == dst and pkt[IP].dst == src and pkt[TCP].sport == dp and
pkt[TCP].dport == sp))]
return filtered
def pkt_dict(pkt):
if pkt.name == "NoPayload":
return {}
result = OrderedDict()
result[pkt.name] = OrderedDict()
for key in pkt.fields.keys():
result[pkt.name][key] = repr(pkt.fields[key])
result.update(pkt_dict(pkt.payload))
return result
def dict_repr_tcp_session(pkts):
results = []
for pkt in pkts:
results.append(pkt_dict(pkt))
return results
def dict_repr_tcp_session_with_filter(pkts, src, dst, sp, dp, proto=TCP):
filtered = filter_packets(pkts, src, dst, sp, dp, proto)
return dict_repr_tcp_session(filtered)
def dict_repr_tcp_session_with_filter_pcap(in_pcap, src, dst, sp, dp, proto=TCP):
pkts = rdpcap(in_pcap)
return dict_repr_tcp_session_with_filter(pkts, src, dst, sp, dp, proto)
def json_repr_tcp_session(pkts):
os = sys.platform
if os == "darwin":
"""
Temp file is used because of Scapy has an issue on macOS, which causes tcpdump to hang
for infinite time
"""
return str(tcpdump(pkts, prog=conf.prog.tshark,
dump=True,
use_tempfile=True,
args=["-T", "json"]))
else:
return str(tcpdump(pkts, prog=conf.prog.tshark,
dump=True,
args=["-T", "json"]))
def json_repr_tcp_session_with_filter(pkts, host, port):
os = sys.platform
if os == "darwin":
"""
Temp file is used because of Scapy issue on macOS, which causes tcpdump to hang
for infinite time
"""
return str(tcpdump(pkts, prog=conf.prog.tshark,
dump=True,
use_tempfile=True,
args=["-Y", "ip.host == {} && tcp.port == {}".format(host, port),
"-T", "json"]))
else:
return str(tcpdump(pkts, prog=conf.prog.tshark,
dump=True,
args=["-Y", "ip.host == {} && tcp.port == {}".format(host, port),
"-T", "json"]))
#args=["-Y", "ip.host == 100.1.1.105 && tcp.port == 39496", "-T", "json"]))
def json_repr_tcp_session_with_filter_using_scapy(in_pcap, src, dst, sp, dp, proto=TCP):
pkts = rdpcap(in_pcap)
filtered = filter_packets(pkts, src, dst, sp, dp, proto)
return json_repr_tcp_session_with_filter(filtered, src, sp)
def json_repr_tcp_session_with_filter_using_scapy_tshark(in_pcap, host, port):
pkts = rdpcap(in_pcap)
return json_repr_tcp_session_with_filter(pkts, host, port)
def json_repr_tcp_session_tshark(in_pcap, host, port):
cmd = 'tshark -r {} -Y "ip.host == {} && tcp.port == {}" -Tjson'.format(
in_pcap, host, port)
res = subprocess.run(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
return str(res)
def ex_tcp_session_with_scapy_tshark(pkts, out_pcap, host, port):
tcpdump(pkts, prog=conf.prog.tshark,
args=["-Y", "ip.host == {} && tcp.port == {}".format(host, port),
"-F", "pcap",
"-w", out_pcap])
def ex_tcp_session_filter_with_scapy_tshark_from_pcap(in_pcap, out_pcap, host, port):
pkts = rdpcap(in_pcap)
ex_tcp_session_with_scapy_tshark(pkts, out_pcap, host, port)
def ex_tcp_session(pkts, out_pcap, src, dst, sp, dp, proto=TCP):
"""
filtered = [pkt for pkt in pkts if TCP in pkt and ((
pkt[IP].src == src and pkt[IP].dst == dst and pkt[TCP].sport == sp and
pkt[TCP].dport == dp) or (
pkt[IP].src == dst and pkt[IP].dst == src and pkt[TCP].sport == dp and
pkt[TCP].dport == sp))]
"""
filtered = filter_packets(pkts, src, dst, sp, dp, proto)
wrpcap(out_pcap, filtered)
def ex_tcp_session_from_pcap(in_pcap, out_pcap, src, dst, sp, dp, proto=TCP):
pkts = rdpcap(in_pcap)
ex_tcp_session(pkts, out_pcap, src, dst, sp, dp, proto)
def ex_tcp_session_with_tshark(in_pcap, out_pcap, host, port):
cmd = ('tshark -r {} -Y "ip.host == {} && tcp.port == {}" -w {} '
'-F pcap').format(in_pcap, host, port, out_pcap)
subprocess.run(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Examples of pcap extrcation from bigger pcap, "
"dictionary representation of pcaps, and performance test.")
parser.add_argument("-i", "--input-pcap", help="input pcap with with multiple different "
"protocol sessions")
args = parser.parse_args()
#pkts = rdpcap(args.input_pcap)
"""
print("================Testing dictionary representation of packets=================")
filtered = [pkt for pkt in pkts if TCP in pkt and
((pkt[IP].src == "100.1.1.105" and pkt[IP].dst == "200.1.20.195" and
pkt[TCP].sport == 39496 and pkt[TCP].dport == 80) or
(pkt[IP].src == "200.1.20.195" and pkt[IP].dst == "100.1.1.105" and
pkt[TCP].sport == 80 and pkt[TCP].dport == 39496))]
#print(pkt_dict(filtered[3]))
tot=timeit.timeit(stmt="dict_repr_tcp_session_without_tshark(filtered)",
setup='from __main__ import dict_repr_tcp_session_without_tshark;'
'from scapy.all import rdpcap, wrpcap, TCP, IP, tcpdump;'
'from collections import OrderedDict;'
'pkts=rdpcap("{}");'
'filtered = [pkt for pkt in pkts if TCP in pkt and '
'((pkt[IP].src == "100.1.1.105" and pkt[IP].dst == "200.1.20.195" '
'and pkt[TCP].sport == 39496 and pkt[TCP].dport == 80) or '
'(pkt[IP].src == "200.1.20.195" and pkt[IP].dst == "100.1.1.105" '
'and pkt[TCP].sport == 80 and pkt[TCP].dport == 39496))];'
''.format(args.input_pcap),
number=100)
"""
num_op = 10
src = "100.1.1.105"
dst = "200.1.20.195"
sp = 39496
dp = 80
proto = TCP
out_pcap_file = "filtered_out.pcap"
tot = timeit.timeit(stmt='rdpcap("{}")'.format(args.input_pcap),
setup='from scapy.all import rdpcap',
number=num_op)
print('Time taken per rdpcap: {}, which would be saved in further Scapy calls, '
'once pcap is read.\nSo, this could be subtracted from all the pure Scapy '
'calls'.format(tot/num_op))
func_name="dict_repr_tcp_session_with_filter_pcap"
stmt = 'res={}("{}","{}", "{}", {}, {})'.format(func_name,
args.input_pcap, src, dst, sp, dp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
func_name="json_repr_tcp_session_with_filter_using_scapy"
stmt = 'res={}("{}","{}", "{}", {}, {})'.format(func_name,
args.input_pcap, src, dst, sp, dp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
func_name="json_repr_tcp_session_with_filter_using_scapy_tshark"
stmt = 'res={}("{}","{}", {})'.format(func_name,
args.input_pcap, src, sp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
func_name="json_repr_tcp_session_tshark"
stmt = 'res={}("{}","{}", {})'.format(func_name,
args.input_pcap, src, sp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
func_name="ex_tcp_session_from_pcap"
stmt = '{}("{}","1-{}", "{}", "{}", {}, {})'.format(func_name,
args.input_pcap, out_pcap_file, src, dst, sp, dp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
print(file_hash("1-{}".format(out_pcap_file)))
func_name="ex_tcp_session_with_tshark"
stmt = '{}("{}", "2-{}", "{}", {})'.format(func_name,
args.input_pcap, out_pcap_file, src, sp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
print(file_hash("2-{}".format(out_pcap_file)))
func_name="ex_tcp_session_filter_with_scapy_tshark_from_pcap"
stmt = '{}("{}", "3-{}", "{}", {})'.format(func_name,
args.input_pcap, out_pcap_file, src, sp)
tot = timeit.timeit(stmt=stmt,
globals=globals(),
number=num_op)
print("Time taken per {} call: {}".format(func_name, tot/num_op))
print(file_hash("3-{}".format(out_pcap_file)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment