Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Marsman1996/392dde6f6f6fcdb32f1f50725613b54d to your computer and use it in GitHub Desktop.
Save Marsman1996/392dde6f6f6fcdb32f1f50725613b54d to your computer and use it in GitHub Desktop.
Extract the flow of requests and responses from a Wireshark dump JSON exported file
import sys
import json
from urllib.parse import urlparse, parse_qs
def parse_multimap(ordered_pairs):
"""JSON loads object_pairs_hook, which creates a list of values when
duplicate keys are found in the JSON file being parsed
:param ordered_pairs: pairs
:return: dict of pairs with the duplicate keys' values as list
"""
multimap = dict()
for k, v in ordered_pairs:
if k in multimap:
multimap[k] = [multimap[k]]
multimap[k].append(v)
else:
multimap[k] = v
return multimap
def main(filename):
"""Main driver function of the script
:param filename: CSV file
:return: None
"""
with open(filename, "r") as f:
contents = f.read()
packets = json.loads(contents, object_pairs_hook=parse_multimap)
for packet in packets:
if "http" not in packet["_source"]["layers"]:
# Skip TCP packets
continue
source = packet['_source']['layers']['ip']['ip.src']
dest = packet['_source']['layers']['ip']['ip.dst']
print(f"{source} -> {dest}")
# identify the request/response key
http = packet["_source"]["layers"]["http"]
rkey = None
for key in http.keys():
if "HTTP" in key:
rkey = key
break
if rkey:
if "http.request.uri" in http[rkey]:
print(http[rkey]["http.request.method"] + " " + http[rkey]["http.request.uri"])
if http[rkey]["http.request.method"] == "GET":
parts = urlparse(http[rkey]["http.request.uri"])
qs = parts[4]
print(json.dumps({k:v[0] for k,v in parse_qs(qs).items()}, indent=2))
elif "http.response.code" in http[rkey]:
print(http[rkey]["http.response.code"] + " " + http[rkey]["http.response.phrase"])
if "http.www_authenticate" in http:
print("WWW-Authenticate: "+http["http.www_authenticate"])
if "http.authorization" in http:
print("Authorization: "+http["http.authorization"])
if "http.request.full_uri" in http:
print(http["http.request.full_uri"])
if "http.content_type" in http:
print("Content-Type: "+http["http.content_type"])
if "http.file_data" in http:
file_data = http["http.file_data"]
if "http.content_type" in http and "json" in http["http.content_type"]:
try:
print(json.dumps(json.loads(file_data), indent=2))
except json.decoder.JSONDecodeError:
print(file_data)
elif "http.content_type" in http and "urlencoded" in http["http.content_type"]:
print(file_data)
print(json.dumps({k:v[0] for k,v in parse_qs(file_data).items()}, indent=2))
else:
print(file_data)
if "http.location" in http:
print(http["http.location"])
print("\n")
if __name__ == '__main__':
if len(sys.argv) == 2:
main(sys.argv[1])
else:
print("Usage: python wirehasrk_http_extractor.py wireshark_export.json > output.txt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment