Skip to content

Instantly share code, notes, and snippets.

@efaisal
Created August 5, 2019 15:27
Show Gist options
  • Save efaisal/d3e2820d8e4a6975c4e9c55abee28412 to your computer and use it in GitHub Desktop.
Save efaisal/d3e2820d8e4a6975c4e9c55abee28412 to your computer and use it in GitHub Desktop.
POC using ICMP to exchange info
#!/usr/bin/env python
import random
import socket
import select
import struct
import time
import platform
def checksum(frame):
s = 0
for i in range(0, len(frame), 2):
w = frame[i] + (frame[i+1] << 8)
s = ((s + w) & 0xffff) + ((s + w) >> 16)
return ~s & 0xffff
def icmp_echo(hdr_id=None, seq=1, embed='', mtu=228):
hdr_type, hdr_code, hdr_chksum = 8, 0, 0
if not hdr_id:
hdr_id = random.randint(1, 65535)
hdr_seq = seq
if type(embed) is str:
embed = bytes(embed, 'utf8')
elif type(embed) is bytes:
pass
else:
raise TypeError('Unsupported type for `embed`')
allowed_len = mtu - 28
if len(embed) > allowed_len:
raise Exception('Payload to large for specified MTU')
pad = b'\x00' * (allowed_len - len(embed))
hdr = struct.pack('bbHHh', hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq)
chksum = struct.pack('H', checksum(hdr + embed + pad))
# Payload
return hdr_id, hdr[:2] + chksum + hdr[4:] + embed + pad
def icmp_echo_reply_parser(frame_id=None, seq=None, frame=None):
hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq = struct.unpack('bbHHh', frame[:8])
if hdr_type != 8 and hdr_id == frame_id and hdr_seq == seq:
return frame[8:].rstrip(b'\x00').decode('utf')
def icmp_echo_parser(frame_id=None, seq=None, frame=None):
hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq = struct.unpack('bbHHh', frame[:8])
if hdr_type == 8 and hdr_id == frame_id and hdr_seq == seq:
return frame[8:].rstrip(b'\x00').decode('utf')
elif hdr_type == 8 and frame_id is None and seq is None:
return hdr_id, hdr_seq, frame[8:].rstrip(b'\x00').decode('utf')
def icmp_echo_reply(hdr_id, seq, embed='', mtu=228):
hdr_type, hdr_code, hdr_chksum = 0, 0, 0
hdr_seq = seq
if type(embed) is str:
embed = bytes(embed, 'utf8')
elif type(embed) is bytes:
pass
else:
raise TypeError('Unsupported type for `embed`')
allowed_len = mtu - 28
if len(embed) > allowed_len:
raise Exception('Payload to large for specified MTU')
pad = b'\x00' * (allowed_len - len(embed))
hdr = struct.pack('bbHHh', hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq)
chksum = struct.pack('H', checksum(hdr + embed + pad))
# Payload
return hdr_id, hdr[:2] + chksum + hdr[4:] + embed + pad
def slave(dest, embed, mtu=228, timeout=2):
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
has_timeout = False
unknown_cmd = False
reply_payload = []
seq = 1
packet_id = None
for l in embed:
cmd, resp = l
packet_id, payload = icmp_echo(packet_id, seq, cmd, mtu)
sock.sendto(payload, (dest, 1))
while True:
ready = select.select([sock], [], [], timeout)
if ready[0] == []: # timeout
has_timeout = True
break
recv, addr = ready[0][0].recvfrom(1024)
parsed_resp = icmp_echo_reply_parser(packet_id, seq, recv[20:])
if parsed_resp == cmd:
continue
if parsed_resp != resp:
unknown_cmd = True
reply_payload.append(parsed_resp)
break
if has_timeout or unknown_cmd:
break
seq = seq + 1
if reply_payload[-1] == 'get os':
cmd = list(platform.linux_distribution())
cmd.extend(list(platform.uname()))
cmd = str(cmd)
packet_id, payload = icmp_echo(packet_id, seq, cmd, mtu)
sock.sendto(payload, (dest, 1))
while True:
ready = select.select([sock], [], [], timeout)
if ready[0] == []: # timeout
has_timeout = True
break
recv, addr = sock.recvfrom(1024)
parsed_resp = icmp_echo_reply_parser(packet_id, seq, recv[20:])
reply_payload.append(parsed_resp)
break
sock.close()
return reply_payload
def command_centre(rules, mtu=228, timeout=2):
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
packet_id, seq, quit = None, None, False
while True:
ready = select.select([sock], [], [])
if ready[0]:
recv, addr = sock.recvfrom(65535)
parsed_req = icmp_echo_parser(packet_id, seq, recv[20:])
if not parsed_req:
continue
if type(parsed_req) is tuple:
packet_id, seq, parsed_req = parsed_req
if quit:
break
if parsed_req in rules:
packet_id, payload = icmp_echo_reply(
packet_id, seq, rules[parsed_req], mtu
)
sock.sendto(payload, addr)
seq = seq + 1
if rules[parsed_req] == 'get os':
quit = True
if __name__ == '__main__':
# Command Centre
#rules = {
# 'helo master': 'helo slave',
# 'ur cmd': 'get os'
#}
#cc = command_centre(rules)
#print('Slave report :: OS info')
#print(cc)
# Slave
rules = (
('helo master', 'helo slave'),
('ur cmd', 'get os')
)
s = slave('192.168.122.99', rules)
print('Master greet:')
for i in s[:-1]:
print(i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment