Skip to content

Instantly share code, notes, and snippets.

@jinnosux
Last active June 10, 2024 11:18
Show Gist options
  • Save jinnosux/63160c7cf9d929f7eb9ce0221917b345 to your computer and use it in GitHub Desktop.
Save jinnosux/63160c7cf9d929f7eb9ce0221917b345 to your computer and use it in GitHub Desktop.
entro.py
"""
Network Packets Entropy Analyzer
Author: Vahid Konicanin
Part of a Research Paper "Detecting Network Anomalies with Shannon Entropy: A Novel Approach to Cybersecurity"
International Balkan University, 2024
"""
import math
import hexdump
import dpkt
import statistics
def print_colored(text, color):
colors = {
'red': '\033[91m',
'reset': '\033[0m'
}
return f"{colors[color]}{text}{colors['reset']}"
def calculate_entropy(string):
# get frequency of characters in string
freqs = [float(string.count(char)) / len(string) for char in set(string)]
# calculate the entropy using Shannon's formula
shannon_entropy = -sum([freq * math.log2(freq) for freq in freqs])
return shannon_entropy
def calculate_normalized_entropy(string):
freqs = [float(string.count(char)) / len(string) for char in set(string)]
entropy = -sum([freq * math.log2(freq) for freq in freqs])
# Calculate the maximum possible entropy for the given data set
max_entropy = math.log2(len(set(string)))
# Normalize the entropy
normalized_entropy = entropy / max_entropy
return normalized_entropy
def process_pcap(file_path, num_packets):
entropy_info = {}
entropy_values = []
entropy_sum = 0
with open(file_path, 'rb') as pcap_file:
pcap = dpkt.pcap.Reader(pcap_file)
for i, (timestamp, packet) in enumerate(pcap):
if i >= num_packets:
break
# Convert packet data to string
packet_string = ''.join(hexdump.dump(packet).split()[1:])
# Get IP address
eth = dpkt.ethernet.Ethernet(packet)
if isinstance(eth.data, dpkt.ip.IP):
ip_address = socket.inet_ntoa(eth.data.src)
# Calculate normalized entropy
normalized_entropy = calculate_normalized_entropy(packet_string)
entropy_sum += normalized_entropy
if ip_address not in entropy_info:
entropy_info[ip_address] = []
entropy_info[ip_address].append(normalized_entropy)
entropy_values.append(normalized_entropy)
# Calculate average normalized entropy
average_normalized_entropy = entropy_sum / num_packets
# Calculate the global average normalized entropy and standard deviation
global_avg_normalized_entropy = statistics.mean(entropy_values)
global_std_dev = statistics.stdev(entropy_values)
# Calculate the threshold for identifying outliers
outlier_threshold = global_avg_normalized_entropy + global_std_dev
print(f"Global Average Normalized Entropy: {global_avg_normalized_entropy}")
print(f"Global Std Dev of Normalized Entropy: {global_std_dev}")
for ip_address, normalized_entropies in entropy_info.items():
avg_ip_normalized_entropy = sum(normalized_entropies) / len(normalized_entropies)
outlier = avg_ip_normalized_entropy > outlier_threshold
# Check if the average_normalized_entropy is above the threshold
if outlier:
avg_ip_normalized_entropy_str = print_colored(f"{avg_ip_normalized_entropy:.4f}", 'red')
ip_address_str = print_colored(f"{ip_address}", 'red')
else:
avg_ip_normalized_entropy_str = f"{avg_ip_normalized_entropy:.4f}"
ip_address_str = f"{ip_address}"
print(f"IP Address: {ip_address_str}, Average Normalized Entropy: {avg_ip_normalized_entropy_str}")
if __name__ == "__main__":
import socket
pcap_file_path = "wireshark_Ethernet_2_00229.pcap"
num_packets_to_process = 10000
process_pcap(pcap_file_path, num_packets_to_process)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment