Last active
June 30, 2023 04:16
-
-
Save remotephone/f6fcc5e5f2c11eb702df2db7f7d728d4 to your computer and use it in GitHub Desktop.
A script mostly written with copilot to parse and process packet captures.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Updated version for https://blog.rmtph.one/posts/CyberDefenders_EscapeRoom/ | |
import datetime | |
import ipaddress | |
import json | |
import os | |
import socket | |
from collections import Counter, defaultdict | |
import communityid | |
import dpkt | |
from geoip import geolite2 | |
from manuf import manuf | |
def calculate_community_id(src_ip, dst_ip, src_port, dest_port) -> str: | |
cid = communityid.CommunityID() | |
tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dest_port) | |
return cid.calc(tpl) | |
def calculate_bytes_per_conv(packets: list[dict]) -> dict: | |
community_id_packet_counts = defaultdict(int) | |
for packet in packets: | |
community_id = packet.get("community_id", "unknown") | |
community_id_packet_counts[community_id] += packet["packet_bytes"] | |
return community_id_packet_counts | |
def geoip_lookup(ip: str) -> dict: | |
# lookup IP in maxmind database | |
try: | |
match = geolite2.lookup(ip) | |
return match.get_info_dict() | |
except: | |
print(f"Error looking up geoip for {ip}") | |
return None | |
def convert_to_serializable(obj): | |
if isinstance(obj, bytes): | |
try: | |
# Attempt to decode the byte string | |
decoded_string = obj.decode("utf-8") | |
except UnicodeDecodeError as e: | |
# Handle the error by ignoring or replacing the invalid characters | |
decoded_string = obj.decode("utf-8", errors="ignore") | |
# Or | |
decoded_string = obj.decode("utf-8", errors="replace") | |
return decoded_string | |
elif isinstance(obj, datetime.datetime): | |
return obj.timestamp() | |
else: | |
raise TypeError(f"Object of type {type(obj)} is not JSON serializable") | |
def open_pcap_and_read_it(pcap_path: str) -> list[dict]: | |
""" | |
This function opens a pcap file and reads it. | |
:param pcap_path: path to the pcap file | |
:return: a list of packets | |
""" | |
# create a list of packets | |
p = manuf.MacParser(update=True) | |
packets = [] | |
# open pcap file | |
with open(pcap_path, "rb") as pcap_file: | |
# read pcap file | |
pcap = dpkt.pcap.Reader(pcap_file) | |
# for each packet in the pcap file | |
for ts, buf in pcap: | |
eth = dpkt.ethernet.Ethernet(buf) | |
ip = eth.data | |
protocol = ip.data | |
# do_not_fragment = bool(ip.off & dpkt.ip.IP_DF) | |
# more_fragments = bool(ip.off & dpkt.ip.IP_MF) | |
# fragment_offset = ip.off & dpkt.ip.IP_OFFMASK | |
if isinstance(protocol, dpkt.tcp.TCP) or isinstance(protocol, dpkt.udp.UDP): | |
packet = { | |
"timestamp": datetime.datetime.utcfromtimestamp(ts).timestamp(), | |
"source_ip": socket.inet_ntop(socket.AF_INET, ip.src), | |
"destination_ip": socket.inet_ntop(socket.AF_INET, ip.dst), | |
"source_mac": ":".join("%02x" % b for b in eth.src), | |
"destination_mac": ":".join("%02x" % b for b in eth.dst), | |
"packet_bytes": len(buf), | |
"protocol": buf[23], | |
# ensure payload is a human readable string if possible | |
"payload": protocol.data.decode(errors="ignore"), | |
} | |
if isinstance(protocol, dpkt.tcp.TCP): | |
packet["source_port"] = protocol.sport | |
packet["destination_port"] = protocol.dport | |
elif isinstance(protocol, dpkt.udp.UDP): | |
packet["source_port"] = protocol.sport | |
packet["destination_port"] = protocol.dport | |
else: | |
packet["source_port"] = None | |
packet["destination_port"] = None | |
if isinstance(protocol, dpkt.tcp.TCP): | |
packet["community_id"] = calculate_community_id(packet["source_ip"], packet["destination_ip"], packet["source_port"], packet["destination_port"]) | |
# if http headers in request, collect them in plain text | |
if ( | |
isinstance(protocol, dpkt.tcp.TCP) | |
and (protocol.dport == 80 or protocol.sport == 80) | |
and "HTTP" in protocol.data.decode(errors="ignore") | |
): | |
headers = protocol.data.decode(errors="ignore").split("\r\n\r\n")[0] | |
# parse the headers and create a dictionary of header name and value | |
headers = dict([header.split(": ") for header in headers.split("\r\n")[1:]]) | |
packet["http_headers"] = headers | |
packet["type"] = "http" | |
# Extract Bodies of SMTP messages if message is smtp | |
if isinstance(protocol, dpkt.tcp.TCP) and protocol.dport in [25, 465, 587]: | |
message = protocol.data.decode(errors="ignore") | |
packet["smtp_message"] = message | |
if isinstance(protocol, dpkt.udp.UDP) and protocol.dport == 53: | |
dns = dpkt.dns.DNS(protocol.data) | |
if dns.qr == dpkt.dns.DNS_Q: | |
packet["dns_queries"] = [] | |
for query in dns.qd: | |
packet["dns_queries"].append(query.name) | |
packet["type"] = "dns" | |
if isinstance(protocol, dpkt.tcp.TCP) and (protocol.dport in [22] or protocol.sport in [22]): | |
# if response is greater than 5KB, create field called possible_ssh_result and value success, else failure | |
if packet['packet_bytes'] > 1000: | |
packet["possible_ssh_result"] = "success" | |
else: | |
packet["possible_ssh_result"] = "failure" | |
packet["type"] = "ssh" | |
if ipaddress.ip_address(packet["source_ip"]).is_global: | |
packet["source_geoip"] = geoip_lookup(packet["source_ip"]) | |
else: | |
packet["source_geoip"] = {} | |
if ipaddress.ip_address(packet["destination_ip"]).is_global: | |
packet["destination_geoip"] = geoip_lookup(packet["destination_ip"]) | |
else: | |
packet["destination_geoip"] = {} | |
try: | |
packet["payload"] = packet["payload"].decode("unicode_escape") | |
except: | |
packet["payload"] = packet["payload"] | |
try: | |
packet["source_mac_vendor"] = p.get_manuf(packet["source_mac"]) | |
packet["destination_mac_vendor"] = p.get_manuf(packet["destination_mac"]) | |
except: | |
print(f"Error looking up mac vendor for {packet['source_mac']} or {packet['destination_mac']}") | |
raise SystemExit | |
packets.append(packet) | |
return packets | |
def main(): | |
pcap_path = "pcap_files/hp_challenge.pcap" | |
packets = open_pcap_and_read_it(pcap_path) | |
pcap_contents = { | |
"total_packets": len(packets), | |
"size_of_pcap": os.path.getsize(pcap_path), | |
"file_name": pcap_path, | |
"pcap_first_packet_time": packets[0].get("timestamp"), | |
"pcap_last_packet_time": packets[-1].get("timestamp"), | |
"pcap_duration": str(datetime.timedelta(seconds=packets[-1].get("timestamp") - packets[0].get("timestamp"))), | |
"most_active_ethernet_source": max(packets, key=lambda x: x["source_mac"])["source_mac"], | |
"most_active_ethernet_destination": max(packets, key=lambda x: x["destination_mac"])["destination_mac"], | |
"most_active_source_ports": [ | |
{"port_number": k, "count": v} for k, v in Counter([packet["source_port"] for packet in packets]).most_common(3) | |
], | |
"most_active_destination_ports": [ | |
{"port_number": k, "count": v} for k, v in Counter([packet["destination_port"] for packet in packets]).most_common(3) | |
], | |
# Count connections from each source IP to each destination IP | |
"connections_by_source_ip": [ | |
{"source_ip": k, "count": v} for k, v in Counter([f'{str(packet["source_ip"])}:{str(packet["source_port"])} -> {str(packet["destination_ip"])}:{str(packet["destination_port"])}' for packet in packets]).most_common(35) | |
], | |
# "total_unique_source_ips": len(set([packet["source_ip"] for packet in packets])), | |
"total_unique_destination_ips": len(set([packet["destination_ip"] for packet in packets])), | |
"total_unique_local_source_ips": len( | |
set([packet["source_ip"] for packet in packets if not ipaddress.ip_address(packet["source_ip"]).is_global]) | |
), | |
"total_unique_local_destination_ips": len( | |
set( | |
[ | |
packet["destination_ip"] | |
for packet in packets | |
if not ipaddress.ip_address(packet["destination_ip"]).is_global | |
] | |
) | |
), | |
"total_unique_global_source_ips": len( | |
set([packet["source_ip"] for packet in packets if ipaddress.ip_address(packet["source_ip"]).is_global]) | |
), | |
"total_unique_global_destination_ips": len( | |
set( | |
[ | |
packet["destination_ip"] | |
for packet in packets | |
if ipaddress.ip_address(packet["destination_ip"]).is_global | |
] | |
) | |
), | |
# Create a dictionary of each dns_query and the timestamp of the packet it occurs in | |
"dns_queries": { | |
query: [packet["timestamp"] for packet in packets if query in packet.get("dns_queries", [])] | |
for query in set([query for packet in packets for query in packet.get("dns_queries", [])]) | |
}, | |
"dns_servers_queried": list( | |
set([packet["destination_ip"] for packet in packets if packet.get("dns_queries", [])]) | |
), | |
# Create a dictionary of files downloaded and the host that downloaded it | |
"combined_smtp_message": "\n".join( | |
[packet["smtp_message"] for packet in packets if packet.get("smtp_message", None)] | |
), | |
"packets": packets, | |
"bytes_per_community_id": calculate_bytes_per_conv(packets), | |
"unique_community_ids_per_type": { | |
"dns": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "dns"])), | |
"http": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "http"])), | |
"ssh": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "ssh"])), | |
} | |
} | |
# ensure packet content is json serializable | |
for packet in pcap_contents["packets"]: | |
if packet.get("bytes_per_community_id", None): | |
packet["stream_bytes"] = packet["bytes_per_community_id"][packet["community_id"]] | |
with open("metadata.json", "w") as f: | |
json.dump(pcap_contents, f, default=convert_to_serializable, indent=4) | |
if __name__ == "__main__": | |
main() |
I have multiple versions of this script, see previous versions for previous iterations of it.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here's the Pipfile