Skip to content

Instantly share code, notes, and snippets.

@Cyber-Neuron
Created October 1, 2018 20:34
Show Gist options
  • Save Cyber-Neuron/f817ab79eadd89b16e19a2ac40784664 to your computer and use it in GitHub Desktop.
Save Cyber-Neuron/f817ab79eadd89b16e19a2ac40784664 to your computer and use it in GitHub Desktop.
restore http stream from pcap via dpkt
# encoding=utf8
import dpkt
from collections import OrderedDict
import json
import string
import fire
import gzip
_printable = string.ascii_letters + string.digits + string.punctuation + ' '
def hex_escape(s):
return ''.join(c if c in _printable else r'\x{0:02x}'.format(ord(c)) for c in s)
def _parse_pcap_file(pcap):
flows = OrderedDict()
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = eth.data
if ip.p != dpkt.ip.IP_PROTO_TCP:
continue
tcp = ip.data
tupl = (ip.src, ip.dst, tcp.sport, tcp.dport)
tupl_s = tuple(sorted(tupl))
if tupl_s in flows:
if tupl in flows[ tupl_s ]:
flows[ tupl_s ][tupl] = flows[ tupl_s ][tupl] + tcp.data
else:
flows[ tupl_s ][tupl] = tcp.data
else:
rr = OrderedDict()
rr[tupl] = tcp.data
flows[tupl_s] = rr
req_res = []
for tupl in flows:
try:
https = []
for s in flows[ tupl ]:
stream = flows[ tupl ][s]
if stream[:4] == 'HTTP':
http = dpkt.http.Response(stream)
data = {"headers":http.headers, "body":hex_escape(http.body[:500]), "type":"response"}
https.append(data)
elif stream[:3] == 'GET' or stream[:4] == 'POST':
http = dpkt.http.Request(stream)
data = {"headers":http.headers, "body":http.body, "type":"request", "method":stream[:4]}
https.append(data)
req_res.append(https)
except dpkt.UnpackError:
pass
return req_res
def process(pfile):
if ".gz" in pfile:
with gzip.open(pfile, 'rb') as f:
pcap = dpkt.pcap.Reader(f)
with open(pfile.split(".")[0] + ".json", "wb") as wf:
json.dump(_parse_pcap_file(pcap), wf)
else:
with open(pfile, 'rb') as f:
pcap = dpkt.pcap.Reader(f)
with open(pfile.split(".")[0] + ".json", "wb") as wf:
json.dump(_parse_pcap_file(pcap), wf)
fire.Fire()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment