Skip to content

Instantly share code, notes, and snippets.

@kbandla
Created March 16, 2016 14:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kbandla/48e6e9fb855103551195 to your computer and use it in GitHub Desktop.
Save kbandla/48e6e9fb855103551195 to your computer and use it in GitHub Desktop.
quick code for issue #254 ( 6244444680ba446da153812333568096 )
'''
dpkt issue 254
'''
import dpkt
from dpkt.ip import IP
from dpkt.ethernet import Ethernet
from dpkt.arp import ARP
from pprint import pprint
import socket
f = open('sample.pcap', 'rb')
pcap = dpkt.pcap.Reader(f)
def ip_to_str(address):
"""
transform a int ip address to a human readable ip address (ipv4)
"""
return socket.inet_ntoa(address)
class Flow(object):
'''
Code from Honeysnap
https://github.com/honeynet
'''
def __init__(self):
self.src = None
self.dst = None
self.sport = None
self.dport = None
def __eq__(self, other):
return self.sport==other.sport and self.dport==other.dport and self.src==other.src and self.dst==other.dst
def __ne__(self, other):
return self.sport!=other.sport or self.dport!=other.dport or self.src!=other.src or self.dst!=other.dst
def __repr__(self):
return "%s.%s-%s.%s" % (self.src, self.sport, self.dst, self.dport)
def isSrcSport(self, src, sport):
if self.src == src and self.sport == sport:
return True
else:
return False
flows = {}
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
print 'Non IP Packet type not supported'
continue
ip = eth.data
if ip.p == 6 or ip.p == 17:
tcp = ip.data
flow = Flow()
flow.src = ip_to_str(ip.src)
flow.dst = ip_to_str(ip.dst)
flow.sport= tcp.sport
flow.dport = tcp.dport
key = repr(flow)
if not flows.has_key(key):
flows[key] = 0
flows[key] += 1
pprint(flows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment