Skip to content

Instantly share code, notes, and snippets.

@brianbienvenu
Created October 14, 2023 13:30
Show Gist options
  • Save brianbienvenu/1cfa067b5580610cb0fc3379211053eb to your computer and use it in GitHub Desktop.
Save brianbienvenu/1cfa067b5580610cb0fc3379211053eb to your computer and use it in GitHub Desktop.
import sys
from scapy.all import IP, TCP, Raw, rdpcap
from colorama import Fore, Style
import json
from operator import itemgetter
from deepdiff import DeepDiff
def sorted_json(js, result):
def norm_str(s):
# because of str special handling of single quotes
if type(s) == dict and 'pn' in s:
return s['pn']
return str(s).replace("'", '"')
if type(js) in [int, str, bool, float] or js is None:
return js
if type(js) == list:
res = [sorted_json(i, {}) for i in js]
return sorted(res, key=norm_str)
items = sorted(js.items(), key=itemgetter(0))
for k, v in items:
result[k] = sorted_json(v, {})
return result
def print_diff(prev: dict, curr: dict):
print_with_color(str(prev), str(curr))
global verbose
if verbose:
dd = DeepDiff(prev, curr)
if 'values_changed' in dd:
for key, value in dd['values_changed'].items():
key = key[4:]
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.LIGHTRED_EX}{value['old_value']} {Fore.RESET}=> {Fore.GREEN}{value['new_value']}{Style.RESET_ALL}")
if 'iterable_item_added' in dd:
for key, value in dd['iterable_item_added'].items():
key = key[4:]
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.RESET}Added {Fore.GREEN}{value}{Style.RESET_ALL}")
if 'iterable_item_removed' in dd:
for key, value in dd['iterable_item_removed'].items():
key = key[4:]
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.RESET}Removed {Fore.LIGHTRED_EX}{value}{Style.RESET_ALL}")
def print_with_color(prev: str, curr: str):
# highlight the differences between prev and curr
for i, (c1, c2) in enumerate(zip(prev, curr)):
if c1 == c2:
print(f"{Fore.RESET}{c2}", end='')
else:
print(f"{Fore.GREEN}{c2}", end='')
print(Style.RESET_ALL)
def extract_http_requests_responses(pcap_file, destination_ip):
http_data = []
def process_packet(packet):
if IP in packet and packet[IP].dst == destination_ip and TCP in packet and packet[TCP].dport == 80:
# if IP in packet and (packet[IP].dst == destination_ip or packet[IP].src == destination_ip) and TCP in packet:
if Raw in packet:
data = packet[Raw].load.decode('utf-8', errors='ignore')
http_data.append(data)
try:
packets = rdpcap(pcap_file)
for packet in packets:
process_packet(packet)
return http_data
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
if len(sys.argv) not in (3, 4):
print("Usage: python open_pcap.py <destination_ip> <pcap_file>")
sys.exit(1)
destination_ip = sys.argv[1]
pcap_file = sys.argv[2]
verbose = False
if len(sys.argv) > 3 and sys.argv[3] == '-v':
verbose = True
print(f'Opening {pcap_file}, printing HTTP payloads for {destination_ip}')
http_data = extract_http_requests_responses(pcap_file, destination_ip)
if not http_data:
print("No HTTP requests or responses found for the specified destination IP.")
sys.exit(1)
headers = [d for d in http_data if d.startswith('PUT') or d.startswith('GET') or d.startswith('POST') or d.startswith('HTTP')]
payloads = [d for d in http_data if d.startswith('{"requests"')]
leftover = [d for d in http_data if d not in headers and d not in payloads]
print(f'Got {len(headers)} headers, {len(payloads)} payloads and {len(leftover)} leftovers')
jspayloads = []
for p in payloads:
js = json.loads(p)
jspayloads.append(sorted_json(js, {}))
print(jspayloads[0])
for prev, curr in zip(jspayloads, jspayloads[1:]):
print_diff(prev, curr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment