Skip to content

Instantly share code, notes, and snippets.

@athoune
Created July 24, 2012 15:41
Show Gist options
  • Save athoune/3170756 to your computer and use it in GitHub Desktop.
Save athoune/3170756 to your computer and use it in GitHub Desktop.
ugly script to spy http connection
import time
import pcap, dpkt, socket
# code stolen from:
# http://bramp.net/blog/2010/01/follow-http-stream-with-decompression/
pc = pcap.pcap('eth0')
count = 0
ports = (80, 8080, 888)
# Pcap writer
pcw = dpkt.pcap.Writer(open('pkts.pcap','wb'))
def tcp_flags(flags):
ret = ''
if flags & dpkt.tcp.TH_FIN:
ret = ret + 'F'
if flags & dpkt.tcp.TH_SYN:
ret = ret + 'S'
if flags & dpkt.tcp.TH_RST:
ret = ret + 'R'
if flags & dpkt.tcp.TH_PUSH:
ret = ret + 'P'
if flags & dpkt.tcp.TH_ACK:
ret = ret + 'A'
if flags & dpkt.tcp.TH_URG:
ret = ret + 'U'
if flags & dpkt.tcp.TH_ECE:
ret = ret + 'E'
if flags & dpkt.tcp.TH_CWR:
ret = ret + 'C'
return ret
conn = dict()
class Connection(object):
def __init__(self, data):
self.start = time.time()
self.data = data
def append(self, data):
self.data += data
def __len__(self):
return len(self.data)
def chronometer(self):
return (time.time() - self.start) * 1000
# Snooping on HTTP traffic
def process(ts, pkt, *args):
eth = dpkt.ethernet.Ethernet(pkt)
ip = eth.data
if ip.__class__ == dpkt.ip.IP:
ip1, ip2 = map(socket.inet_ntoa, [ip.src, ip.dst])
#if ip.p != 6:
#return
l7 = ip.data
if l7.__class__ == dpkt.tcp.TCP:
sport, dport = [l7.sport, l7.dport]
if (sport in ports or dport in ports):# and len(l7.data) > 0:
tupl = (ip.src, ip.dst, l7.sport, l7.dport)
if tupl in conn:
conn[tupl].append(l7.data)
else:
conn[tupl] = Connection(l7.data)
try:
stream = conn[tupl].data
print conn[tupl].chronometer(),
if stream[:4] == 'HTTP':
http = dpkt.http.Response(stream)
print "Response", http.status
else:
http = dpkt.http.Request(stream)
print "Request", http.method, http.uri
stream = stream[len(http):]
if len(stream) == 0:
del conn[tupl]
else:
conn[tupl] = Connection(stream)
except dpkt.UnpackError:
pass
if __name__ == "__main__":
try:
pc.loop(process)
except KeyboardInterrupt:
print pc.stats()
pcw.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment