Skip to content

Instantly share code, notes, and snippets.

@danield137
Last active May 6, 2024 22:03
Show Gist options
  • Save danield137/29c2c57e1a04c78bab890b6d52c7830a to your computer and use it in GitHub Desktop.
Save danield137/29c2c57e1a04c78bab890b6d52c7830a to your computer and use it in GitHub Desktop.
Analyze NSG flow logs
import abc
from collections import defaultdict
import json
import os
import pandas as pd
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
def process_file(path: str) -> List[Tuple[dict]]:
if not path.endswith(".json"):
raise ValueError("File must be in JSON format")
data_dict = json.load(open(path))
# Flatten the JSON structure and extract flow tuples into a DataFrame
flow_data = []
for record in data_dict['records']:
time = record['time']
version = record['properties']['Version']
if not version or int(version) != 2:
# print(f"Unsupported version: {version}")
continue
for flow in record['properties']['flows']:
rule = flow['rule']
if rule != 'DefaultRule_AllowInternetOutBound':
# print(f"Unsupported rule: {rule}")
continue
for subflow in flow['flows']:
for tuple_str in subflow['flowTuples']:
tuple_data = tuple_str.split(',')
protocol = tuple_data[5]
flow = tuple_data[6]
decision = tuple_data[7]
state = tuple_data[8]
# we only care about end of TCP outbound connections that were accepted - because we have the bytes sent
if protocol == 'T' and flow == 'O' and decision == 'A' and state == 'E':
flow_data.append({
'time': tuple_data[0],
'source_ip': tuple_data[1],
'dest_ip': tuple_data[2],
'source_port': int(tuple_data[3]),
'dest_port': int(tuple_data[4]),
'protocol': tuple_data[5],
'traffic_flow': tuple_data[6],
'traffic_decision': tuple_data[7],
'flow_state': tuple_data[8],
'packets_sent': int(tuple_data[9]) if tuple_data[9].isdigit() else 0,
'bytes_sent': int(tuple_data[10]) if tuple_data[10].isdigit() else 0,
'packets_received': int(tuple_data[11]) if tuple_data[11].isdigit() else 0,
'bytes_received': int(tuple_data[12]) if tuple_data[12].isdigit() else 0
})
return flow_data
def format_bytes(value: float, *args: List[Any], **kwargs: Dict[str, Any]) -> str:
"""
Converts a byte value into a human-readable format with the appropriate units.
:param value: The size in bytes to be converted.
:return: A human-readable string representing the size.
"""
units = ["B", "KB", "MB", "GB", "TB", "PB"]
size = float(value)
for unit in units:
if size < 1024:
return f"{size:.4f} {unit}"
size /= 1024.0
return f"{size:.4f} {units[-1]}"
@dataclass
class IpGeo:
status: str
continent: Optional[str]
country: Optional[str]
region: Optional[str]
INTER_CONTINENTAL_COSTS_PER_GB = {
"NA": 0.05,
"EU": 0.05,
"AS": 0.08,
"OC": 0.08,
"AF": 0.08,
"SA": 0.16,
}
INTRA_CONTINENTAL_COSTS_PER_GB = {
"NA": 0.02,
"EU": 0.02,
"AS": 0.08,
"OC": 0.08,
"ME": 0.08,
"AF": 0.08,
"SA": 0.16,
}
BYTES_IN_GB = 1024 * 1024 * 1024
def estimate_cost(home: IpGeo, traffic: pd.DataFrame) -> dict[str, Tuple[float, float]]:
"""
Given the home location and the traffic data, estimate the cost of the traffic.
Returns a dict of {'inter-continental': (existing_cost, predicted_cost), 'intra-continental': (existing_cost, predicted_cost)}
"""
inter_cost = 0
intra_cost = 0
inter_continental_cost_per_gb = INTER_CONTINENTAL_COSTS_PER_GB[home.continent]
intra_continental_cost_per_gb = INTRA_CONTINENTAL_COSTS_PER_GB[home.continent]
min_time, max_time = traffic['time'].min(), traffic['time'].max()
hours = (max_time - min_time).total_seconds() / 3600
for idx, row in traffic.iterrows():
if row['continent'] != home.continent:
inter_cost += row['bytes_sent'] / BYTES_IN_GB * inter_continental_cost_per_gb
else:
intra_cost += row['bytes_sent'] / BYTES_IN_GB * intra_continental_cost_per_gb
daily_inter_cost = inter_cost / hours * 24
daily_intra_cost = intra_cost / hours * 24
return {'inter-continental': (inter_cost, daily_inter_cost), 'intra-continental': (intra_cost, daily_intra_cost)}
class IpGeoCache:
data: dict[str, IpGeo]
def __init__(self):
self.data = defaultdict(IpGeo)
def __getitem__(self, ip: str) -> IpGeo:
return self.data[ip]
def load(self) -> bool:
if os.path.exists("ip_cache.json"):
try:
data = json.load(open("ip_cache.json"))
self.data = {ip: IpGeo(**data[ip]) for ip in data}
return True
except Exception as e:
...
return False
def save(self, ips: dict[str, IpGeo]) -> bool:
try:
json.dump({ip: ips[ip].__dict__ for ip in ips}, open("ip_cache.json", "w"))
return True
except Exception as e:
return False
def add_known(self, ip: str, geo: IpGeo) -> None:
self.data[ip] = geo
def add_unknown(self, ip: str) -> None:
self.data[ip] = IpGeo("error", None, None, None)
def is_known(self, ip: str) -> bool:
return ip in self.data and self.data[ip].status == 'success'
def is_unknown(self, ip: str) -> bool:
return ip in self.data and self.data[ip].status != 'success'
def unknown(self) -> List[str]:
return [ip for ip in self.data if self.data[ip].status != 'success']
def all_known(self) -> bool:
return all([self.is_known(ip) for ip in self.data])
class GeoService(abc.ABC):
@abc.abstractmethod
def get_geo_data_batch(self, ips: List[str]) -> Tuple[Dict[str, IpGeo], bool]:
pass
@abc.abstractmethod
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]:
pass
class IpApiGeoService(GeoService):
cache: IpGeoCache
batch_size: int = 100
def __init__(self, cache: IpGeoCache) -> None:
super().__init__()
self.cache = cache
def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]:
batch_size = 100
results = {}
ok = True
for ip in ips:
batch = []
if self.cache.is_known(ip):
results[ip] = self.cache[ip]
continue
batch.append({"query": ip, "fields": "status,continentCode,countryCode,region,query"})
if len(batch) == batch_size:
try:
response = requests.post(
f"https://ip-api.com/batch?fields=status,continentCode,countryCode,region,query",
json=batch)
response_json = response.json()
for idx, resp in enumerate(response_json):
results[batch[idx]['query']] = IpGeo(
status=resp['status'],
continent=resp['continentCode'],
country=resp['countryCode'],
region=resp['region']
)
except Exception as e:
ok = False
print(f"Error fetching geo data: {e}")
return results, ok
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]:
result = self.get_geo_data_batch([ip])
return (result[0][ip], result[1])
class IpApi2GeoService(GeoService):
cache: IpGeoCache
def __init__(self, cache: IpGeoCache) -> None:
super().__init__()
self.cache = cache
def get_geo_data_batch(self, ips: List[str]) -> Dict[str, IpGeo]:
results = {}
ok = True
for ip in ips:
result = self.get_geo_data_single(ip)
results[ip] = result[0]
ok = ok and result[1]
return results, ok
def get_geo_data_single(self, ip: str) -> Tuple[IpGeo, bool]:
try:
if self.cache.is_known(ip):
return self.cache[ip], True
response = requests.get(f"https://ipapi.co/{ip}/json/")
response.raise_for_status()
resp_json = response.json()
ip_geo = IpGeo(
"success",
resp_json['continent_code'],
resp_json['country_code'],
resp_json['region_code']
)
self.cache.add_known(ip, ip_geo)
return self.cache[ip], True
except Exception as e:
return IpGeo("error", None, None, None), False
def get_geo_data(ips: List[str]) -> Dict[str, IpGeo]:
# first collect ips to make a batch request
cached_ips = IpGeoCache()
cached_ips.load()
services = [IpApi2GeoService(cached_ips), IpApiGeoService(cached_ips)]
ok = False
for service in services:
results, ok = service.get_geo_data_batch(ips)
if cached_ips.all_known():
break
if not ok:
raise Exception(f"Some services failed. There are {len(cached_ips.unknown())} unknown IPs. Please review them manually.")
cached_ips.save(results)
return results
def main():
folder = "data" if os.getcwd().endswith("nsg") else "nsg/data"
# traverse the directory and process each file
# note that the file structure can be nested (h=19\m=32\macAddress=00-0D-3A-13-1E-8E\PT1H.json)
# we want to process all "leaf" files
df = pd.DataFrame(columns=['time', 'source_ip', 'dest_ip', 'source_port', 'dest_port', 'protocol', 'traffic_flow', 'traffic_decision', 'flow_state', 'packets_sent', 'bytes_sent', 'packets_received', 'bytes_received'])
files_processed = 0
print(f"{os.getcwd()}")
for root, _, files in os.walk(folder):
for file in files:
path = os.path.join(root, file)
if path.endswith(".json"):
data_tuples = process_file(path)
print(f"Processed {len(data_tuples)} flow tuples from {path}")
df = pd.concat([df, pd.DataFrame(data_tuples)], ignore_index=True)
files_processed += 1
print(f"Processed {files_processed} files and {len(df)} flow tuples in total")
print(f"total bytes sent: {format_bytes(df['bytes_sent'].sum())}")
# summarize all traffic by destination IP
dest_ip_summary = df.groupby('dest_ip', as_index=False).agg({
'bytes_sent': 'sum',
'bytes_received': 'sum',
})
geo_data = get_geo_data(list(dest_ip_summary['dest_ip'].unique()))
existing_cost, predicted_cost = estimate_cost(IpGeo("EU", "NL", "CA"), dest_ip_summary)
print(f"Existing cost: {existing_cost:.2f} USD")
print(f"Predicted daily cost: {predicted_cost:.2f} USD")
# extend the dataframe with the geo data (continent, country, region)
dest_ip_summary['continent'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['continentCode'] if geo_data[x]['status'] == 'success' else None)
dest_ip_summary['country'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['countryCode'] if geo_data[x]['status'] == 'success' else None)
dest_ip_summary['region'] = dest_ip_summary['dest_ip'].map(lambda x: geo_data[x]['region'] if geo_data[x]['status'] == 'success' else None)
# summarize sent data by each of the above (by itself) and print each summary
for col in ['continent', 'country', 'region']:
print(f"Summary by {col}")
print(dest_ip_summary.groupby(col, as_index=False).agg({
'bytes_sent': 'sum',
'bytes_received': 'sum',
}))
if __name__ == "__main__":
main()
import abc
from collections import defaultdict
import json
import os
import pandas as pd
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
def process_file(path: str) -> List[Tuple[dict]]:
if not path.endswith(".json"):
raise ValueError("File must be in JSON format")
data_dict = json.load(open(path))
print(f"Processing {path} with {len(data_dict['records'])} records")
# Flatten the JSON structure and extract flow tuples into a DataFrame
flow_data = []
for record in data_dict['records']:
time = record['time']
version = record['properties']['Version']
if not version or int(version) != 2:
# print(f"Unsupported version: {version}")
continue
for flow in record['properties']['flows']:
rule = flow['rule']
if rule != 'DefaultRule_AllowInternetOutBound':
# print(f"Unsupported rule: {rule}")
continue
for subflow in flow['flows']:
for tuple_str in subflow['flowTuples']:
tuple_data = tuple_str.split(',')
protocol = tuple_data[5]
flow = tuple_data[6]
decision = tuple_data[7]
state = tuple_data[8]
# we only care about end of TCP outbound connections that were accepted - because we have the bytes sent
if protocol == 'T' and flow == 'O' and decision == 'A' and state == 'E':
flow_data.append({
'time': tuple_data[0],
'source_ip': tuple_data[1],
'dest_ip': tuple_data[2],
'source_port': int(tuple_data[3]),
'dest_port': int(tuple_data[4]),
'protocol': tuple_data[5],
'traffic_flow': tuple_data[6],
'traffic_decision': tuple_data[7],
'flow_state': tuple_data[8],
'packets_sent': int(tuple_data[9]) if tuple_data[9].isdigit() else 0,
'bytes_sent': int(tuple_data[10]) if tuple_data[10].isdigit() else 0,
'packets_received': int(tuple_data[11]) if tuple_data[11].isdigit() else 0,
'bytes_received': int(tuple_data[12]) if tuple_data[12].isdigit() else 0
})
return flow_data
def format_bytes(value: float, *args: List[Any], **kwargs: Dict[str, Any]) -> str:
"""
Converts a byte value into a human-readable format with the appropriate units.
:param value: The size in bytes to be converted.
:return: A human-readable string representing the size.
"""
units = ["B", "KB", "MB", "GB", "TB", "PB"]
size = float(value)
for unit in units:
if size < 1024:
return f"{size:.4f} {unit}"
size /= 1024.0
return f"{size:.4f} {units[-1]}"
def main():
folder = "data" if os.getcwd().endswith("nsg") else "nsg/data"
# traverse the directory and process each file
# note that the file structure can be nested (h=19\m=32\macAddress=00-0D-3A-13-1E-8E\PT1H.json)
# we want to process all "leaf" files
df = pd.DataFrame(columns=['time', 'source_ip', 'dest_ip', 'source_port', 'dest_port', 'protocol', 'traffic_flow', 'traffic_decision', 'flow_state', 'packets_sent', 'bytes_sent', 'packets_received', 'bytes_received'])
files_processed = 0
print(f"{os.getcwd()}")
for root, _, files in os.walk(folder):
for file in files:
path = os.path.join(root, file)
if path.endswith(".json"):
data_tuples = process_file(path)
print(f"Processed {len(data_tuples)} flow tuples from {path}")
df = pd.concat([df, pd.DataFrame(data_tuples)], ignore_index=True)
files_processed += 1
print(f"Processed {files_processed} files and {len(df)} flow tuples in total")
print(f"total bytes sent: {format_bytes(df['bytes_sent'].sum())}")
df.to_csv("flows.csv", index=False)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment