Skip to content

Instantly share code, notes, and snippets.

@fahadysf
Last active September 22, 2018 13:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fahadysf/de7810a308115395d5373f1248b78b2c to your computer and use it in GitHub Desktop.
Save fahadysf/de7810a308115395d5373f1248b78b2c to your computer and use it in GitHub Desktop.
This script extracts TCP Flows from a Packet Capture files and finds common patterns (hex) of give length (default 7-bytes). Useful for Custom App-ID Creation
#!/usr/bin/env python
"""
Copyright (C) 2018
Author: Fahad Yousuf
This script extracts TCP Flows from a Packet Capture files and finds
common patterns (hex) of give length (default 7-bytes)
Usage: flowextractor.py [-h] [-d DPORT] inputfile
positional arguments:
inputfile Input file in pcap format
optional arguments:
-h, --help show this help message and exit
-d DPORT, --dport DPORT
"""
import dpkt
import hashlib
from dpkt.compat import compat_ord
import datetime
import socket
import binascii
import string
import argparse
#Setting up the argument parser
parser = argparse.ArgumentParser()
parser.add_argument("inputfile", help="Input file in pcap format", action='store', type=str)
parser.add_argument("-d", "--dport", help="Destination Port of interest", action='store', type=int)
parser.add_argument("-n", "--length", help="Minimum length of pattern to match", default=7, action='store', type=int)
args = parser.parse_args()
filename = args.inputfile
# Helper functions to convert MAC and IP into readable strings
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address)
def inet_to_str(inet):
"""Convert inet object to a string
Args:
inet (inet struct): inet network address
Returns:
str: Printable/readable IP address
"""
# First try ipv4 and then ipv6
try:
return socket.inet_ntop(socket.AF_INET, inet)
except ValueError:
return socket.inet_ntop(socket.AF_INET6, inet)
def hex_to_ascii(hexstr):
if len(hexstr)%2==1:
hexstr = "0"+hexstr
data = (hexstr).decode("hex")
output = "".join(c if c in string.printable else '.' for c in data)
return output.strip()
def generate_flow_name(src_ip, dest_ip, sport, dport, seq):
"""
"""
return ("{0}:{1}->{2}:{3} ({4})").format(src_ip, int(sport), dest_ip, int(dport), int(seq))
def generate_flow_id(flow_name):
"""Generate an Name for a specific TCP flow.
Args:
flow_name (str): Flow name
Returns:
str: Unique ID (SHA256 hash of flow name)
"""
return (hashlib.sha256(flow_name)).hexdigest()
def common_substr(a, b, k):
""" Find first matching substring in two strings
Args:
a (str): First string
b (str): Second string
k (int): Length of substring to find
Returns:
str: First found common substring
"""
substrs = set(a[i:i+k] for i in range(len(a)-k+1))
for substr in (b[i:i+k] for i in range(len(b)-k+1)):
if substr in substrs:
return substr
return None
def find_all_common_substrings(str_a, str_b, size):
found_strings = list()
substr = common_substr(str_a, str_b, size)
#print(("Found common subtstring: {0}").format(substr))
while substr != None:
found_strings += [substr]
str_a = str_a.replace(substr, '')
str_b = str_b.replace(substr, '')
substr = common_substr(str_a, str_b, size)
return found_strings
def find_common_substrings_in_string_array(string_list, size):
""" Find first matching substring in list of strings
Args:
string_list (list): List of strings to search
size (int): Length of substring to find
Returns:
list(str): List of common substrings
"""
string_list.sort()
listlen = len(string_list)
common_strings = dict()
unchecked_remaining = True
unchecked_indexes = list(xrange(listlen))
# Keep checking for commong strings until you have
# at-least 1 or have checked all source strings
while (common_strings.keys() == []) and (unchecked_remaining == True):
if len(unchecked_indexes) >= 2:
str_a = string_list[unchecked_indexes.pop()]
str_b = string_list[unchecked_indexes.pop()]
elif len(unchecked_indexes) == 1:
str_a = str_b
str_b = string_list[unchecked_indexes.pop()]
else:
unchecked_remaining = False
if unchecked_remaining == True:
substr_list = find_all_common_substrings(str_a, str_b, size)
if len(substr_list):
for substr in substr_list:
common_strings[substr] = dict()
for cstr in common_strings:
common_strings[cstr]["hits"] = 0
for string in string_list:
if cstr in string:
common_strings[cstr]["hits"] += 1
common_strings[cstr]['count'] = len(string_list)
common_strings[cstr]["hit_percentage"] = 100.0 * float(common_strings[cstr]["hits"])/float(common_strings[cstr]['count'])
common_strings[cstr]["printable-string"] = hex_to_ascii(cstr)
return common_strings
def flow_extractor(pcapreader):
flows = dict()
tcpcounter = 0
ipcounter = 0
seq = 0
# PCAP parsing starts here
for ts, pkt in pcapreader:
try:
eth = dpkt.ethernet.Ethernet(pkt)
except:
print("Could not process frame at timestamp: %s" % str(ts))
ip = eth.data
if (type(ip)== dpkt.ip.IP) and isinstance(ip.data, dpkt.tcp.TCP):
tcp = ip.data
process = False
if args.dport:
if (tcp.dport==args.dport) or (tcp.sport==args.dport):
process = True
else:
process = True
# Save sequence number of the first segment in TCP flow
if process:
if ( tcp.flags & dpkt.tcp.TH_SYN ) != 0 and ( tcp.flags & dpkt.tcp.TH_ACK ) == 0:
seq = tcp.seq
flow_name = generate_flow_name(inet_to_str(ip.src), inet_to_str(ip.dst), tcp.sport, tcp.dport, seq)
inverted_flow_string = generate_flow_name(inet_to_str(ip.dst), inet_to_str(ip.src), tcp.dport, tcp.sport, seq)
flow_id = generate_flow_id(flow_name)
tcpcounter += 1
flows[flow_id] = dict()
flows[flow_id]["name"] = flow_name
flows[flow_id]["inverted"] = inverted_flow_string
flows[flow_id]["src"] = inet_to_str(ip.src)
flows[flow_id]["dst"] = inet_to_str(ip.dst)
flows[flow_id]["sport"] = tcp.sport
flows[flow_id]["dport"] = tcp.dport
flows[flow_id]["seq"] = seq
flows[flow_id]["payload"] = list()
if len(tcp.data):
flow_name = generate_flow_name(inet_to_str(ip.src), inet_to_str(ip.dst), tcp.sport, tcp.dport, seq)
inverted_flow_string = generate_flow_name(inet_to_str(ip.dst), inet_to_str(ip.src), tcp.dport, tcp.sport, seq)
flow_id = generate_flow_id(flow_name)
reverse_id = generate_flow_id(inverted_flow_string)
payload = binascii.hexlify(tcp.data).decode()
if flow_id in flows.keys():
# Only add new payload if it is not a duplicate (already present in the list)
if tuple(("forward", payload)) not in flows[flow_id]["payload"]:
flows[flow_id]["payload"].append(tuple(("forward", payload)))
elif reverse_id in flows.keys():
if tuple(("reverse", payload)) not in flows[reverse_id]["payload"]:
flows[reverse_id]["payload"].append(tuple(("reverse", payload)))
ipcounter += 1
return flows
if __name__ == "__main__":
flows = flow_extractor(dpkt.pcap.Reader(open(filename, 'rb')))
# Remove exact duplicate flows:
import json
request_strings = list()
response_strings = list()
for i, key in enumerate(flows):
for item in flows[key]["payload"][:2]:
if item[0] == "forward":
request_strings.append(item[1])
elif item[0] == "reverse":
response_strings.append(item[1])
print("Request Strings:")
print(json.dumps(request_strings, indent=2))
print("Response Strings:")
print(json.dumps(response_strings, indent=2))
print("")
print(("Common {0}-byte substrings in requests with hit-count").format(args.length))
print(json.dumps(find_common_substrings_in_string_array(request_strings, 2*args.length), indent=2))
print(("Common {0}-byte substrings in responses with hit-count").format(args.length))
print(json.dumps(find_common_substrings_in_string_array(response_strings, 2*args.length), indent=2))
@fahadysf
Copy link
Author

Revision #3 resolves an issue with "Duplicate" (re-transmit) payloads causing issues with detection of responses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment