-
-
Save brianbienvenu/1cfa067b5580610cb0fc3379211053eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from scapy.all import IP, TCP, Raw, rdpcap | |
from colorama import Fore, Style | |
import json | |
from operator import itemgetter | |
from deepdiff import DeepDiff | |
def sorted_json(js, result): | |
def norm_str(s): | |
# because of str special handling of single quotes | |
if type(s) == dict and 'pn' in s: | |
return s['pn'] | |
return str(s).replace("'", '"') | |
if type(js) in [int, str, bool, float] or js is None: | |
return js | |
if type(js) == list: | |
res = [sorted_json(i, {}) for i in js] | |
return sorted(res, key=norm_str) | |
items = sorted(js.items(), key=itemgetter(0)) | |
for k, v in items: | |
result[k] = sorted_json(v, {}) | |
return result | |
def print_diff(prev: dict, curr: dict): | |
print_with_color(str(prev), str(curr)) | |
global verbose | |
if verbose: | |
dd = DeepDiff(prev, curr) | |
if 'values_changed' in dd: | |
for key, value in dd['values_changed'].items(): | |
key = key[4:] | |
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.LIGHTRED_EX}{value['old_value']} {Fore.RESET}=> {Fore.GREEN}{value['new_value']}{Style.RESET_ALL}") | |
if 'iterable_item_added' in dd: | |
for key, value in dd['iterable_item_added'].items(): | |
key = key[4:] | |
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.RESET}Added {Fore.GREEN}{value}{Style.RESET_ALL}") | |
if 'iterable_item_removed' in dd: | |
for key, value in dd['iterable_item_removed'].items(): | |
key = key[4:] | |
print(f"\t{Fore.LIGHTYELLOW_EX}{key} {Fore.RESET}Removed {Fore.LIGHTRED_EX}{value}{Style.RESET_ALL}") | |
def print_with_color(prev: str, curr: str): | |
# highlight the differences between prev and curr | |
for i, (c1, c2) in enumerate(zip(prev, curr)): | |
if c1 == c2: | |
print(f"{Fore.RESET}{c2}", end='') | |
else: | |
print(f"{Fore.GREEN}{c2}", end='') | |
print(Style.RESET_ALL) | |
def extract_http_requests_responses(pcap_file, destination_ip): | |
http_data = [] | |
def process_packet(packet): | |
if IP in packet and packet[IP].dst == destination_ip and TCP in packet and packet[TCP].dport == 80: | |
# if IP in packet and (packet[IP].dst == destination_ip or packet[IP].src == destination_ip) and TCP in packet: | |
if Raw in packet: | |
data = packet[Raw].load.decode('utf-8', errors='ignore') | |
http_data.append(data) | |
try: | |
packets = rdpcap(pcap_file) | |
for packet in packets: | |
process_packet(packet) | |
return http_data | |
except Exception as e: | |
print(f"Error: {e}") | |
if __name__ == "__main__": | |
if len(sys.argv) not in (3, 4): | |
print("Usage: python open_pcap.py <destination_ip> <pcap_file>") | |
sys.exit(1) | |
destination_ip = sys.argv[1] | |
pcap_file = sys.argv[2] | |
verbose = False | |
if len(sys.argv) > 3 and sys.argv[3] == '-v': | |
verbose = True | |
print(f'Opening {pcap_file}, printing HTTP payloads for {destination_ip}') | |
http_data = extract_http_requests_responses(pcap_file, destination_ip) | |
if not http_data: | |
print("No HTTP requests or responses found for the specified destination IP.") | |
sys.exit(1) | |
headers = [d for d in http_data if d.startswith('PUT') or d.startswith('GET') or d.startswith('POST') or d.startswith('HTTP')] | |
payloads = [d for d in http_data if d.startswith('{"requests"')] | |
leftover = [d for d in http_data if d not in headers and d not in payloads] | |
print(f'Got {len(headers)} headers, {len(payloads)} payloads and {len(leftover)} leftovers') | |
jspayloads = [] | |
for p in payloads: | |
js = json.loads(p) | |
jspayloads.append(sorted_json(js, {})) | |
print(jspayloads[0]) | |
for prev, curr in zip(jspayloads, jspayloads[1:]): | |
print_diff(prev, curr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment