Skip to content

Instantly share code, notes, and snippets.

@0xpizza
Created January 4, 2022 19:51
Show Gist options
  • Save 0xpizza/66b316a9946fbc120b673b66345192b4 to your computer and use it in GitHub Desktop.
Save 0xpizza/66b316a9946fbc120b673b66345192b4 to your computer and use it in GitHub Desktop.
Snoop and craft ICMP packets
import asyncio
import dataclasses
import enum
import socket
import struct
import sys
RUNNING_AS_ADMIN = False
if sys.platform == 'win32':
import ctypes
if ctypes.windll.shell32.IsUserAnAdmin() != 0:
RUNNING_AS_ADMIN = True
else:
import os
if os.getuid() == 0:
RUNNING_AS_ADMIN = True
class IcmpType(enum.IntEnum):
ECHO_REPLY = 0
DEST_UNREACHABLE = 3
REDIRECT_MSG = 5
ECHO_REQUEST = 8
TIMESTAMP_REQUEST = 13
TIMESTAMP_REPLY = 14
ECHO_REQUESTX = 42
ECHO_REPLYX = 43
@dataclasses.dataclass
class Icmp4Packet:
type : IcmpType = IcmpType.ECHO_REQUEST
code : int = 0
checksum : int = -1
param : bytes = b'\0\0\0\0'
data : bytes = b''
def __post_init__(self):
if self.type > 255:
raise ValueError('ICMP type must be <= 255')
# convert to enum, if available
if not isinstance(self.type, IcmpType):
for t in IcmpType:
if self.type == t:
self.type = t
if self.checksum == -1:
self.checksum = self.compute_checksum()
if self.checksum > 0xffff:
raise ValueError(f'Checksum (0x{self.checksum:x}) must be <= 0xffff')
if isinstance(self.checksum, bytes):
if len(self.checksum) == 2:
self.checksum = struct.pack('!H', self.checksum)
else:
raise ValueError('Checksum must be 2 bytes')
if isinstance(self.data, str):
self.data = self.data.encode('latin1')
def compute_checksum(self):
data = bytes((self.type, self.code)) + self.param + self.data
if len(data) % 2 == 1:
data += b'\0'
data = struct.unpack(f'!{len(data) // 2}H', data)
checksum = sum(data)
while checksum > 0xffff:
checksum = (checksum & 0xffff) + (checksum >> 16)
checksum = ~checksum & 0xffff
return checksum
def tobytes(self):
"""Packs ICMP packet ready to be sent"""
return (
struct.pack(f'!BBH', self.type, self.code, self.checksum) +
self.param.zfill(4) + self.data
)
@property
def valid_checksum(self):
return self.checksum == self.compute_checksum()
@classmethod
def frombytes(cls, data):
return cls(*struct.unpack(f'!BBH4s{len(data)-8}s', data))
class Icmp4Server:
"""Loosely based on the asyncio.DatagramProtocol class"""
MAX_RECV = 65535 - 8
@staticmethod
def _decode_ipv4_header(data) -> tuple[int, tuple[str, int], bytes]:
"""Extract source address and data from a raw packet.
Returns a tuple: (IP Protocol, source address, data)
Currently only supports IPv4
"""
ip_version = (data[0] & 0xf0) >> 4
if ip_version == 4:
header_len = data[0] & 0xf
header_len *= 4 # ihl is expressed as 32-bit words
header, data = data[:header_len], data[header_len:]
ipproto = header[9]
srcip = '.'.join(map(str, header[12:16]))
dstip = '.'.join(map(str, header[16:20]))
elif ip_version == 6:
return None
else:
raise ValueError(f'Invalid IP Protocol: {ip_version}')
return (ipproto, (srcip, dstip), data)
def __init__(self, bind_address=None):
"""Initialize the ICMP server to accept ICMP Echo traffic"""
self.loop = asyncio.get_running_loop()
self._egress_queue = asyncio.Queue()
self._ingress_queue = asyncio.Queue()
if bind_address is None:
# 0.0.0.0 doesn't cut it; must bind to actual ip address.
bind_address = socket.gethostbyname(socket.gethostname())
try:
# windows only allows us to promiscuously listen on IPPROTO_IP
# therefore, we must send and listen promiscuously on different sockets.
sock_listener = socket.socket(type=socket.SOCK_RAW, proto=socket.IPPROTO_IP)
sock_sender = socket.socket(type=socket.SOCK_RAW, proto=socket.IPPROTO_ICMP)
except OSError as e:
raise OSError(
'Creating raw sockets requires administrative privileges.'
) from e
# set listener to promiscuous mode
sock_listener.bind((bind_address, 0))
sock_listener.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, True)
sock_listener.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
self.bind_address = bind_address
self.sock_listener = sock_listener
self.sock_sender = sock_sender
async def start_snooper(self):
"""Snoop all packets on the interface, extracting ICMP packets.
Yes, all packets. Use with care.
"""
proto_icmp = socket.getprotobyname('icmp')
self.loop = asyncio.get_running_loop()
self.loop.create_task(self._packet_sender())
self.loop.create_task(self._packet_receiver())
while True:
packet = await self.loop.sock_recv(self.sock_listener, self.MAX_RECV)
try:
hostdata = self._decode_ipv4_header(packet)
except ValueError as e:
continue
if hostdata is None: # ipv6 packet detected
continue
if hostdata[0] == proto_icmp:
srcip, dstip = hostdata[1]
if dstip == self.bind_address:
icmp_packet = Icmp4Packet.frombytes(hostdata[2])
self._ingress_queue.put_nowait(
self.loop.run_in_executor(None,
self.packet_received, icmp_packet, srcip
)
)
async def _packet_receiver(self):
while True:
await ( await self._ingress_queue.get() )
async def _packet_sender(self):
while True:
host, data = await self._egress_queue.get()
await self.loop.run_in_executor(None,
self.sock_sender.sendto, data, host
)
def send_to(self, host:str, data):
host = (host, 0) # icmp has no ports, but sendto requires a port.
if isinstance(data, bytes):
data = Icmp4Packet(data=data)
if not isinstance(data, Icmp4Packet):
raise TypeError(f'Sent data must be {bytes!r} or {Icmp4Packet!r} not {data!r}')
self._egress_queue.put_nowait((host, data.tobytes()))
def packet_received(self, icmp_packet:Icmp4Packet, srcip:str) -> None:
"""
Subclass me!
Called when an ICMP packet arrives on the network.
WARNING: other processes may be aware of this packet too,
and any echo reply processes may have responded to it by
the time this code starts running.
"""
class MyIcmpServer(Icmp4Server):
def packet_received(self, packet, src):
print(src, packet.data.hex())
async def amain():
server = MyIcmpServer()
await server.start_snooper()
def main():
if not RUNNING_AS_ADMIN:
print('Please run this script with elevated privileges.')
raise SystemExit(1)
try:
asyncio.run(amain(), debug=True)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment