Last active
November 11, 2017 10:13
-
-
Save candale/9eca946a137a993580cab205c91abb10 to your computer and use it in GitHub Desktop.
Python Packet Sniffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
from struct import unpack | |
import pprint | |
from collections import namedtuple | |
interface = sys.argv[1] | |
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP) | |
raw_socket.setsockopt(socket.SOL_SOCKET, 25, interface + '\0') | |
packet_stats = {'total_number_of_packets': 0} | |
def extract_tcp_ip_header(packet_payload): | |
ip_header = packet_payload[:20] | |
ip_header = unpack('!BBHHHBBH4s4s', ip_header) | |
version = ip_header[0] >> 4 | |
header_length = (ip_header[0] & 0xF) * 4 | |
ttl = ip_header[5] | |
protocol = ip_header[6] | |
source_addr = socket.inet_ntoa(ip_header[8]) | |
destination_addr = socket.inet_ntoa(ip_header[9]) | |
IPHeader = namedtuple( | |
'IPHeader', | |
( | |
'version', 'header_length', 'ttl', 'protocol', 'source_addr', | |
'destination_addr' | |
) | |
) | |
return IPHeader(version, header_length, ttl, protocol, source_addr, destination_addr) | |
def extract_tcp_header(packet_payload, ip_header): | |
tcp_header = packet_payload[ip_header.header_length:ip_header.header_length + 20] | |
tcp_header = unpack('!HHLLBBHHH', tcp_header) | |
TCPHeader = namedtuple( | |
'TCPHeader', | |
( | |
'source_port', 'destination_port', 'sequence', 'ack', | |
'header_length' | |
) | |
) | |
header_length = (tcp_header[4] >> 4) * 4 | |
return TCPHeader( | |
tcp_header[0], tcp_header[1], tcp_header[2], tcp_header[3], | |
header_length) | |
def extract_data(packet_payload, ip_header, tcp_header): | |
total_header_size = ip_header.header_length + tcp_header.header_length | |
data_size = len(packet_payload) - total_header_size | |
return packet_payload[total_header_size:], data_size | |
def receive_and_process(): | |
packet = raw_socket.recvfrom(65565) | |
packet_payload = packet[0] | |
ip_header = extract_tcp_ip_header(packet_payload) | |
tcp_header = extract_tcp_header(packet_payload, ip_header) | |
data, data_size = extract_data(packet_payload, ip_header, tcp_header) | |
if ip_header.source_addr not in packet_stats: | |
packet_stats[ip_header.source_addr] = { | |
'number_of_packets': 0, | |
'number_of_bytes': 0, | |
'average_packet_size': 0 | |
} | |
p_stat = packet_stats[ip_header.source_addr] | |
p_stat['average_packet_size'] = ( | |
data_size if p_stat['number_of_packets'] == 0 else | |
(p_stat['average_packet_size'] + data_size) / 2 | |
) | |
p_stat['number_of_packets'] += 1 | |
p_stat['number_of_bytes'] += data_size | |
packet_stats['total_number_of_packets'] += 1 | |
if packet_stats['total_number_of_packets'] % 500 == 0: | |
pprint.pprint(packet_stats) | |
print ip_header | |
print tcp_header | |
print "Data length: ", data_size | |
# print data | |
def run(): | |
while True: | |
receive_and_process() | |
if __name__ == '__main__': | |
try: | |
run() | |
except KeyboardInterrupt: | |
pprint.pprint(packet_stats) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
i got error using this , can you help me out the error says:
line 144, in init
_socket.socket.init(self, family, type, proto, fileno)
OSError: [WinError 10043] The requested protocol has not been configured into the system, or no implementation for it exists