Skip to content

Instantly share code, notes, and snippets.

@remotephone
Last active June 30, 2023 04:16
Show Gist options
  • Save remotephone/f6fcc5e5f2c11eb702df2db7f7d728d4 to your computer and use it in GitHub Desktop.
Save remotephone/f6fcc5e5f2c11eb702df2db7f7d728d4 to your computer and use it in GitHub Desktop.
A script mostly written with copilot to parse and process packet captures.
# Updated version for https://blog.rmtph.one/posts/CyberDefenders_EscapeRoom/
import datetime
import ipaddress
import json
import os
import socket
from collections import Counter, defaultdict
import communityid
import dpkt
from geoip import geolite2
from manuf import manuf
def calculate_community_id(src_ip, dst_ip, src_port, dest_port) -> str:
cid = communityid.CommunityID()
tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dest_port)
return cid.calc(tpl)
def calculate_bytes_per_conv(packets: list[dict]) -> dict:
community_id_packet_counts = defaultdict(int)
for packet in packets:
community_id = packet.get("community_id", "unknown")
community_id_packet_counts[community_id] += packet["packet_bytes"]
return community_id_packet_counts
def geoip_lookup(ip: str) -> dict:
# lookup IP in maxmind database
try:
match = geolite2.lookup(ip)
return match.get_info_dict()
except:
print(f"Error looking up geoip for {ip}")
return None
def convert_to_serializable(obj):
if isinstance(obj, bytes):
try:
# Attempt to decode the byte string
decoded_string = obj.decode("utf-8")
except UnicodeDecodeError as e:
# Handle the error by ignoring or replacing the invalid characters
decoded_string = obj.decode("utf-8", errors="ignore")
# Or
decoded_string = obj.decode("utf-8", errors="replace")
return decoded_string
elif isinstance(obj, datetime.datetime):
return obj.timestamp()
else:
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
def open_pcap_and_read_it(pcap_path: str) -> list[dict]:
"""
This function opens a pcap file and reads it.
:param pcap_path: path to the pcap file
:return: a list of packets
"""
# create a list of packets
p = manuf.MacParser(update=True)
packets = []
# open pcap file
with open(pcap_path, "rb") as pcap_file:
# read pcap file
pcap = dpkt.pcap.Reader(pcap_file)
# for each packet in the pcap file
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
ip = eth.data
protocol = ip.data
# do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
# more_fragments = bool(ip.off & dpkt.ip.IP_MF)
# fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
if isinstance(protocol, dpkt.tcp.TCP) or isinstance(protocol, dpkt.udp.UDP):
packet = {
"timestamp": datetime.datetime.utcfromtimestamp(ts).timestamp(),
"source_ip": socket.inet_ntop(socket.AF_INET, ip.src),
"destination_ip": socket.inet_ntop(socket.AF_INET, ip.dst),
"source_mac": ":".join("%02x" % b for b in eth.src),
"destination_mac": ":".join("%02x" % b for b in eth.dst),
"packet_bytes": len(buf),
"protocol": buf[23],
# ensure payload is a human readable string if possible
"payload": protocol.data.decode(errors="ignore"),
}
if isinstance(protocol, dpkt.tcp.TCP):
packet["source_port"] = protocol.sport
packet["destination_port"] = protocol.dport
elif isinstance(protocol, dpkt.udp.UDP):
packet["source_port"] = protocol.sport
packet["destination_port"] = protocol.dport
else:
packet["source_port"] = None
packet["destination_port"] = None
if isinstance(protocol, dpkt.tcp.TCP):
packet["community_id"] = calculate_community_id(packet["source_ip"], packet["destination_ip"], packet["source_port"], packet["destination_port"])
# if http headers in request, collect them in plain text
if (
isinstance(protocol, dpkt.tcp.TCP)
and (protocol.dport == 80 or protocol.sport == 80)
and "HTTP" in protocol.data.decode(errors="ignore")
):
headers = protocol.data.decode(errors="ignore").split("\r\n\r\n")[0]
# parse the headers and create a dictionary of header name and value
headers = dict([header.split(": ") for header in headers.split("\r\n")[1:]])
packet["http_headers"] = headers
packet["type"] = "http"
# Extract Bodies of SMTP messages if message is smtp
if isinstance(protocol, dpkt.tcp.TCP) and protocol.dport in [25, 465, 587]:
message = protocol.data.decode(errors="ignore")
packet["smtp_message"] = message
if isinstance(protocol, dpkt.udp.UDP) and protocol.dport == 53:
dns = dpkt.dns.DNS(protocol.data)
if dns.qr == dpkt.dns.DNS_Q:
packet["dns_queries"] = []
for query in dns.qd:
packet["dns_queries"].append(query.name)
packet["type"] = "dns"
if isinstance(protocol, dpkt.tcp.TCP) and (protocol.dport in [22] or protocol.sport in [22]):
# if response is greater than 5KB, create field called possible_ssh_result and value success, else failure
if packet['packet_bytes'] > 1000:
packet["possible_ssh_result"] = "success"
else:
packet["possible_ssh_result"] = "failure"
packet["type"] = "ssh"
if ipaddress.ip_address(packet["source_ip"]).is_global:
packet["source_geoip"] = geoip_lookup(packet["source_ip"])
else:
packet["source_geoip"] = {}
if ipaddress.ip_address(packet["destination_ip"]).is_global:
packet["destination_geoip"] = geoip_lookup(packet["destination_ip"])
else:
packet["destination_geoip"] = {}
try:
packet["payload"] = packet["payload"].decode("unicode_escape")
except:
packet["payload"] = packet["payload"]
try:
packet["source_mac_vendor"] = p.get_manuf(packet["source_mac"])
packet["destination_mac_vendor"] = p.get_manuf(packet["destination_mac"])
except:
print(f"Error looking up mac vendor for {packet['source_mac']} or {packet['destination_mac']}")
raise SystemExit
packets.append(packet)
return packets
def main():
pcap_path = "pcap_files/hp_challenge.pcap"
packets = open_pcap_and_read_it(pcap_path)
pcap_contents = {
"total_packets": len(packets),
"size_of_pcap": os.path.getsize(pcap_path),
"file_name": pcap_path,
"pcap_first_packet_time": packets[0].get("timestamp"),
"pcap_last_packet_time": packets[-1].get("timestamp"),
"pcap_duration": str(datetime.timedelta(seconds=packets[-1].get("timestamp") - packets[0].get("timestamp"))),
"most_active_ethernet_source": max(packets, key=lambda x: x["source_mac"])["source_mac"],
"most_active_ethernet_destination": max(packets, key=lambda x: x["destination_mac"])["destination_mac"],
"most_active_source_ports": [
{"port_number": k, "count": v} for k, v in Counter([packet["source_port"] for packet in packets]).most_common(3)
],
"most_active_destination_ports": [
{"port_number": k, "count": v} for k, v in Counter([packet["destination_port"] for packet in packets]).most_common(3)
],
# Count connections from each source IP to each destination IP
"connections_by_source_ip": [
{"source_ip": k, "count": v} for k, v in Counter([f'{str(packet["source_ip"])}:{str(packet["source_port"])} -> {str(packet["destination_ip"])}:{str(packet["destination_port"])}' for packet in packets]).most_common(35)
],
# "total_unique_source_ips": len(set([packet["source_ip"] for packet in packets])),
"total_unique_destination_ips": len(set([packet["destination_ip"] for packet in packets])),
"total_unique_local_source_ips": len(
set([packet["source_ip"] for packet in packets if not ipaddress.ip_address(packet["source_ip"]).is_global])
),
"total_unique_local_destination_ips": len(
set(
[
packet["destination_ip"]
for packet in packets
if not ipaddress.ip_address(packet["destination_ip"]).is_global
]
)
),
"total_unique_global_source_ips": len(
set([packet["source_ip"] for packet in packets if ipaddress.ip_address(packet["source_ip"]).is_global])
),
"total_unique_global_destination_ips": len(
set(
[
packet["destination_ip"]
for packet in packets
if ipaddress.ip_address(packet["destination_ip"]).is_global
]
)
),
# Create a dictionary of each dns_query and the timestamp of the packet it occurs in
"dns_queries": {
query: [packet["timestamp"] for packet in packets if query in packet.get("dns_queries", [])]
for query in set([query for packet in packets for query in packet.get("dns_queries", [])])
},
"dns_servers_queried": list(
set([packet["destination_ip"] for packet in packets if packet.get("dns_queries", [])])
),
# Create a dictionary of files downloaded and the host that downloaded it
"combined_smtp_message": "\n".join(
[packet["smtp_message"] for packet in packets if packet.get("smtp_message", None)]
),
"packets": packets,
"bytes_per_community_id": calculate_bytes_per_conv(packets),
"unique_community_ids_per_type": {
"dns": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "dns"])),
"http": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "http"])),
"ssh": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "ssh"])),
}
}
# ensure packet content is json serializable
for packet in pcap_contents["packets"]:
if packet.get("bytes_per_community_id", None):
packet["stream_bytes"] = packet["bytes_per_community_id"][packet["community_id"]]
with open("metadata.json", "w") as f:
json.dump(pcap_contents, f, default=convert_to_serializable, indent=4)
if __name__ == "__main__":
main()
@remotephone
Copy link
Author

Here's the Pipfile

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
dpkt = "*"
python-geoip-geolite2 = "*"
python-geoip-python3 = "*"
mac-vendor-lookup = "*"
manuf = "*"

[dev-packages]

[requires]
python_version = "3.11"

@remotephone
Copy link
Author

I have multiple versions of this script, see previous versions for previous iterations of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment