A script mostly written with copilot to parse and process packet captures.
# Updated version for
import datetime
import ipaddress
import json
import os
import socket
from collections import Counter, defaultdict
import communityid
import dpkt
from geoip import geolite2
from manuf import manuf
def calculate_community_id(src_ip, dst_ip, src_port, dest_port) -> str:
cid = communityid.CommunityID()
tpl = communityid.FlowTuple.make_tcp(src_ip, dst_ip, src_port, dest_port)
return cid.calc(tpl)
def calculate_bytes_per_conv(packets: list[dict]) -> dict:
community_id_packet_counts = defaultdict(int)
for packet in packets:
community_id = packet.get("community_id", "unknown")
community_id_packet_counts[community_id] += packet["packet_bytes"]
return community_id_packet_counts
def geoip_lookup(ip: str) -> dict:
# lookup IP in maxmind database
match = geolite2.lookup(ip)
return match.get_info_dict()
print(f"Error looking up geoip for {ip}")
return None
def convert_to_serializable(obj):
if isinstance(obj, bytes):
# Attempt to decode the byte string
decoded_string = obj.decode("utf-8")
except UnicodeDecodeError as e:
# Handle the error by ignoring or replacing the invalid characters
decoded_string = obj.decode("utf-8", errors="ignore")
# Or
decoded_string = obj.decode("utf-8", errors="replace")
return decoded_string
elif isinstance(obj, datetime.datetime):
return obj.timestamp()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
def open_pcap_and_read_it(pcap_path: str) -> list[dict]:
This function opens a pcap file and reads it.
:param pcap_path: path to the pcap file
:return: a list of packets
# create a list of packets
p = manuf.MacParser(update=True)
packets = []
# open pcap file
with open(pcap_path, "rb") as pcap_file:
# read pcap file
pcap = dpkt.pcap.Reader(pcap_file)
# for each packet in the pcap file
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
ip =
protocol =
# do_not_fragment = bool( & dpkt.ip.IP_DF)
# more_fragments = bool( & dpkt.ip.IP_MF)
# fragment_offset = & dpkt.ip.IP_OFFMASK
if isinstance(protocol, dpkt.tcp.TCP) or isinstance(protocol, dpkt.udp.UDP):
packet = {
"timestamp": datetime.datetime.utcfromtimestamp(ts).timestamp(),
"source_ip": socket.inet_ntop(socket.AF_INET, ip.src),
"destination_ip": socket.inet_ntop(socket.AF_INET, ip.dst),
"source_mac": ":".join("%02x" % b for b in eth.src),
"destination_mac": ":".join("%02x" % b for b in eth.dst),
"packet_bytes": len(buf),
"protocol": buf[23],
# ensure payload is a human readable string if possible
if isinstance(protocol, dpkt.tcp.TCP):
packet["source_port"] =
packet["destination_port"] = protocol.dport
elif isinstance(protocol, dpkt.udp.UDP):
packet["source_port"] =
packet["destination_port"] = protocol.dport
packet["source_port"] = None
packet["destination_port"] = None
if isinstance(protocol, dpkt.tcp.TCP):
packet["community_id"] = calculate_community_id(packet["source_ip"], packet["destination_ip"], packet["source_port"], packet["destination_port"])
# if http headers in request, collect them in plain text
if (
isinstance(protocol, dpkt.tcp.TCP)
and (protocol.dport == 80 or == 80)
and "HTTP" in"ignore")
headers ="ignore").split("\r\n\r\n")[0]
# parse the headers and create a dictionary of header name and value
headers = dict([header.split(": ") for header in headers.split("\r\n")[1:]])
packet["http_headers"] = headers
packet["type"] = "http"
# Extract Bodies of SMTP messages if message is smtp
if isinstance(protocol, dpkt.tcp.TCP) and protocol.dport in [25, 465, 587]:
message ="ignore")
packet["smtp_message"] = message
if isinstance(protocol, dpkt.udp.UDP) and protocol.dport == 53:
dns = dpkt.dns.DNS(
if dns.qr == dpkt.dns.DNS_Q:
packet["dns_queries"] = []
for query in dns.qd:
packet["type"] = "dns"
if isinstance(protocol, dpkt.tcp.TCP) and (protocol.dport in [22] or in [22]):
# if response is greater than 5KB, create field called possible_ssh_result and value success, else failure
if packet['packet_bytes'] > 1000:
packet["possible_ssh_result"] = "success"
packet["possible_ssh_result"] = "failure"
packet["type"] = "ssh"
if ipaddress.ip_address(packet["source_ip"]).is_global:
packet["source_geoip"] = geoip_lookup(packet["source_ip"])
packet["source_geoip"] = {}
if ipaddress.ip_address(packet["destination_ip"]).is_global:
packet["destination_geoip"] = geoip_lookup(packet["destination_ip"])
packet["destination_geoip"] = {}
packet["payload"] = packet["payload"].decode("unicode_escape")
packet["payload"] = packet["payload"]
packet["source_mac_vendor"] = p.get_manuf(packet["source_mac"])
packet["destination_mac_vendor"] = p.get_manuf(packet["destination_mac"])
print(f"Error looking up mac vendor for {packet['source_mac']} or {packet['destination_mac']}")
raise SystemExit
return packets
def main():
pcap_path = "pcap_files/hp_challenge.pcap"
packets = open_pcap_and_read_it(pcap_path)
pcap_contents = {
"total_packets": len(packets),
"size_of_pcap": os.path.getsize(pcap_path),
"file_name": pcap_path,
"pcap_first_packet_time": packets[0].get("timestamp"),
"pcap_last_packet_time": packets[-1].get("timestamp"),
"pcap_duration": str(datetime.timedelta(seconds=packets[-1].get("timestamp") - packets[0].get("timestamp"))),
"most_active_ethernet_source": max(packets, key=lambda x: x["source_mac"])["source_mac"],
"most_active_ethernet_destination": max(packets, key=lambda x: x["destination_mac"])["destination_mac"],
"most_active_source_ports": [
{"port_number": k, "count": v} for k, v in Counter([packet["source_port"] for packet in packets]).most_common(3)
"most_active_destination_ports": [
{"port_number": k, "count": v} for k, v in Counter([packet["destination_port"] for packet in packets]).most_common(3)
# Count connections from each source IP to each destination IP
"connections_by_source_ip": [
{"source_ip": k, "count": v} for k, v in Counter([f'{str(packet["source_ip"])}:{str(packet["source_port"])} -> {str(packet["destination_ip"])}:{str(packet["destination_port"])}' for packet in packets]).most_common(35)
# "total_unique_source_ips": len(set([packet["source_ip"] for packet in packets])),
"total_unique_destination_ips": len(set([packet["destination_ip"] for packet in packets])),
"total_unique_local_source_ips": len(
set([packet["source_ip"] for packet in packets if not ipaddress.ip_address(packet["source_ip"]).is_global])
"total_unique_local_destination_ips": len(
for packet in packets
if not ipaddress.ip_address(packet["destination_ip"]).is_global
"total_unique_global_source_ips": len(
set([packet["source_ip"] for packet in packets if ipaddress.ip_address(packet["source_ip"]).is_global])
"total_unique_global_destination_ips": len(
for packet in packets
if ipaddress.ip_address(packet["destination_ip"]).is_global
# Create a dictionary of each dns_query and the timestamp of the packet it occurs in
"dns_queries": {
query: [packet["timestamp"] for packet in packets if query in packet.get("dns_queries", [])]
for query in set([query for packet in packets for query in packet.get("dns_queries", [])])
"dns_servers_queried": list(
set([packet["destination_ip"] for packet in packets if packet.get("dns_queries", [])])
# Create a dictionary of files downloaded and the host that downloaded it
"combined_smtp_message": "\n".join(
[packet["smtp_message"] for packet in packets if packet.get("smtp_message", None)]
"packets": packets,
"bytes_per_community_id": calculate_bytes_per_conv(packets),
"unique_community_ids_per_type": {
"dns": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "dns"])),
"http": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "http"])),
"ssh": len(set([packet["community_id"] for packet in packets if packet.get("type", None) == "ssh"])),
# ensure packet content is json serializable
for packet in pcap_contents["packets"]:
if packet.get("bytes_per_community_id", None):
packet["stream_bytes"] = packet["bytes_per_community_id"][packet["community_id"]]
with open("metadata.json", "w") as f:
json.dump(pcap_contents, f, default=convert_to_serializable, indent=4)
if __name__ == "__main__":
Here's the Pipfile

url = ""
verify_ssl = true
name = "pypi"

dpkt = "*"
python-geoip-geolite2 = "*"
python-geoip-python3 = "*"
mac-vendor-lookup = "*"
manuf = "*"


python_version = "3.11"

I have multiple versions of this script, see previous versions for previous iterations of it.

