Last active
April 17, 2024 02:21
-
-
Save killswitch-GUI/314e79581f2619a18d94c81d53e5466f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Pure Python Tiny Packet Snifer | |
# Author: Alexander Rymdeko-Harvey | |
# Twitter: @Killswitch-GUI | |
import socket, time | |
from datetime import datetime | |
import struct | |
def outputPcapPFile(fileName): | |
# magic_number="\xC3\xD4\xA1\xB2", | |
# version_major="\x00\x02" | |
# version_minor="\x00\x04" | |
# thiszone="\x00\x00\x00\x00" | |
# sigfigs="\x00\x00\x00\x00" | |
# snaplen="\x00\x00\x00\x04" | |
# network="\x00\x01\x00\x00" | |
pcapHeader = struct.pack("@IHHIIII",0xa1b2c3d4,2,4,0,0,0x040000,1) | |
with open(str(fileName), 'wb+') as f: | |
f.write(pcapHeader) | |
def ouputPcapPacket(fileName, pLen, packet): | |
# "ts_sec", "\x00\x00\x00\x00" | |
# "ts_usec", "\x00\x00\x00\x00" | |
# "incl_len", "\x00\x00\x00\x00" | |
# "orig_len", "\x00\x00\x00\x00" | |
t0, t1, t2, t3, t4, t5, t6, t7, t8 = time.gmtime() | |
tstamp = time.mktime((t0, t1, t2, t3, t4, t5, 0, 0, 0)) | |
dt = datetime.now() | |
mstamp = dt.microsecond | |
pcapPacket = struct.pack("@IIII",tstamp,mstamp,pLen,pLen) | |
with open(str(fileName), 'ab+') as f: | |
f.write(pcapPacket) | |
f.write(packet) | |
def parseEthernetHeader(data): | |
dst = struct.unpack('!BBBBBB',data[:6]) # destination host address | |
src = struct.unpack('!BBBBBB',data[6:12]) # source host address | |
nextType = struct.unpack('!H',data[12:14])[0] # IP? ARP? RARP? etc | |
return dst, src, nextType | |
def parseIpHeader(data): | |
ihl = struct.unpack('!B',data[14:15])[0] # 4 bit version 4 bit ihl | |
tos = struct.unpack('!B',data[15:16])[0] # Type of service | |
totalLen = struct.unpack('!H',data[16:18])[0] # IP header length | |
ident = struct.unpack('!H',data[18:20])[0] # IP ident | |
fragFlags = struct.unpack('!H',data[20:22])[0] # Frag_and_flags | |
ttl = struct.unpack('!B',data[22:23])[0] # Packet Time-to-Live | |
proto = struct.unpack('!B',data[23:24])[0] # Next protocol | |
checksum = struct.unpack('!H',data[24:26])[0] # checksum | |
sourceIp = struct.unpack('!I',data[26:30])[0] # Source IP addr | |
destIp = struct.unpack('!I',data[30:34])[0] # Dest IP addr | |
sourceIpStr = parseIpAddr(data[26:30]) # hton ip | |
destIpStr = parseIpAddr(data[30:34]) # hton ip | |
return proto, sourceIpStr, destIpStr | |
def parseTcpHeader(data): | |
sourcePort = struct.unpack('!H',data[34:36])[0] # source port (set pointer to end of IP Header) | |
destPort = struct.unpack('!H',data[36:38])[0] # destination port | |
sequence = struct.unpack('!I',data[38:42])[0] # sequence number - 32 bits | |
acknowledge = struct.unpack('!I',data[42:46])[0] # acknowledgement number - 32 bits | |
return sourcePort, destPort | |
def parseUdpHeader(data): | |
sourcePort = struct.unpack('!H',data[34:36])[0] # source port (set pointer to end of IP Header) | |
destPort = struct.unpack('!H',data[36:38])[0] # destination port | |
udpLength = struct.unpack('!H',data[38:40])[0] # Udp packet length | |
udpChecksum = struct.unpack('!H',data[40:42])[0] # Udp checksum (optional) | |
return sourcePort, destPort | |
def parseIcmpHeader(data): | |
typeCode = struct.unpack('!H',data[34:36])[0] # ICMP Error type | |
code = struct.unpack('!H',data[36:38])[0] # Type sub code | |
checksum = struct.unpack('!H',data[38:40])[0] # checksum | |
idCode = struct.unpack('!H',data[40:42])[0] # ICMP ID code | |
seq = struct.unpack('!H',data[42:44])[0] # Seq number | |
def parseIpAddr(data): | |
# build dec dot notation IP sting from bytes | |
ipOct = [] | |
ipOct.append(str(struct.unpack('!B', data[0:1])[0])) # octet 1 | |
ipOct.append(str(struct.unpack('!B', data[1:2])[0])) # octet 2 | |
ipOct.append(str(struct.unpack('!B', data[2:3])[0])) # octet 3 | |
ipOct.append(str(struct.unpack('!B', data[3:4])[0])) # octet 4 | |
ipStr = '.'.join(ipOct) | |
return ipStr | |
def socketSniffer(fileName,ipFilter,portFilter,maxSize, maxPackets): | |
try: | |
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW ,socket.ntohs(0x0003)) | |
print '[*] Socket successfully created' | |
except socket.error , msg: | |
print '[!] Socket could not be created. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] | |
return | |
# build pcap file header and output | |
outputPcapPFile(fileName) | |
packetCounter = 0 | |
sizeCounter = 0 | |
maxSize = maxSize * 1024 * 1024 | |
while (packetCounter < maxPackets): | |
if (sizeCounter > maxSize): | |
break | |
packet = s.recvfrom(65565) | |
pLen = len(packet[0]) | |
if (ipFilter or portFilter): | |
packetOut = False | |
dst, src, nextType = parseEthernetHeader(packet[0]) | |
if (hex(nextType) == hex(0x800)): | |
# evaluate the IP packet | |
proto, sourceIpStr, destIpStr = parseIpHeader(packet[0]) | |
# ICMP (1) | |
# TCP (6) | |
# UDP (17) | |
if (proto == 6): | |
sourcePort, destPort = parseTcpHeader(packet[0]) | |
print sourcePort | |
if ipFilter and portFilter: | |
if (ipFilter == sourceIpStr or ipFilter == destIpStr) and (portFilter == sourcePort or portFilter == destPort): | |
packetOut = True | |
elif (ipFilter == sourceIpStr or ipFilter == destIpStr): | |
packetOut = True | |
elif (portFilter == sourcePort or portFilter == destPort): | |
packetOut = True | |
elif (proto == 17): | |
sourcePort, destPort = parseUdpHeader(packet[0]) | |
if ipFilter and portFilter: | |
if (ipFilter == sourceIpStr or ipFilter == destIpStr) and (portFilter == sourcePort or portFilter == destPort): | |
packetOut = True | |
elif (ipFilter == sourceIpStr or ipFilter == destIpStr): | |
packetOut = True | |
elif (portFilter == sourcePort or portFilter == destPort): | |
packetOut = True | |
else: | |
if (ipFilter == sourceIpStr or ipFilter == destIpStr): | |
packetOut = True | |
if packetOut: | |
ouputPcapPacket(fileName ,pLen, packet[0]) | |
# counters | |
sizeCounter += pLen | |
packetCounter += 1 | |
else: | |
ouputPcapPacket(fileName ,pLen, packet[0]) | |
# counters | |
sizeCounter += pLen | |
packetCounter += 1 | |
printStr = "[*] PCAP Complete" | |
printStr += "\n - Total packet count: " + str(packetCounter) | |
printStr += "\n - PCAP size in MB: " + str((float(sizeCounter) / 1024) / 1024) | |
printStr += "\n - PCAP filename: " + str(fileName) | |
print printStr | |
fileName2 = 'test.pcap' | |
ipFilter = 0 | |
portFilter = 443 | |
maxSize = 100 | |
maxPackets = 1000 | |
socketSniffer(fileName2,ipFilter,portFilter,maxSize,maxPackets) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment