Skip to content

Instantly share code, notes, and snippets.

@zeroSteiner
Last active November 7, 2017 20:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeroSteiner/7920683 to your computer and use it in GitHub Desktop.
Save zeroSteiner/7920683 to your computer and use it in GitHub Desktop.
Jarvis is a module which provides easy access to convenience functions.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# jarvis.py
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of the nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Homepage: https://gist.github.com/zeroSteiner/7920683
# Author: Spencer McIntyre (zeroSteiner)
import argparse
import contextlib
import logging
import os
import re
import signal
import struct
import subprocess
import sys
import time
import zlib
__all__ = [
'__version__',
'DEFAULT',
'STREAM_STDOUT',
'STREAM_STDERR',
'Jarvis'
]
__version__ = '0.2.15'
DEFAULT = -1
STREAM_STDERR = sys.stderr
STREAM_STDOUT = sys.stdout
def which(program):
is_exe = lambda fpath: (os.path.isfile(fpath) and os.access(fpath, os.X_OK))
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
DWORD_REGISTERS = ['EAX', 'ECX', 'EDX', 'EBX', 'ESP', 'EBP', 'ESI', 'EDI']
MONA_GADGET_PREFIX_REGEX = '^0x[0-9a-fA-F]{8}\s:\s\s'
MONA_GADGET_SUFFIX_REGEX = '\s+\*\*.*$'
SET_GADGET_REGEXS = [
'# POP {register} # RETN?',
'# POP {register} # RETN?\s0x[\da-fA-F]+'
]
READ_GADGET_REGEXS = [
'# MOV {register},DWORD PTR [FSDG]S:\[{register}\] # RETN?',
'# MOV E..,DWORD PTR [FSDG]S:\[{register}\] # RETN?',
'# AD[CD] {register},DWORD PTR [FSDG]S:[{register}] # RETN?',
'# MOV E..,DWORD PTR [FSDG]S:\[{register}\+[\da-fA-F]+\] # RETN?',
'# MOV E..,DWORD PTR [FSDG]S:\[{register}\] # RETN?\s0x[\da-fA-F]+',
'# AD[CD] {register},DWORD PTR [FSDG]S:[{register}] # RETN?\s0x[\da-fA-F]+',
'# MOV E..,DWORD PTR [FSDG]S:\[{register}\+[\da-fA-F]+ # RETN?\s0x[\da-fA-F]+',
]
WRITE_GADGET_REGEXS = [
'# MOV DWORD PTR [FSDG]S:\[{register}\],E.. # RETN?',
'# MOV DWORD PTR [FSDG]S:\[{register}\+[\da-fA-F]+\],E.. # RETN?',
'# MOV DWORD PTR [FSDG]S:\[{register}\],E.. # POP E.. # RETN?',
'# MOV DWORD PTR [FSDG]S:\[{register}\+[\da-fA-F]+\],E.. # POP E.. # RETN?'
]
WRITE_EAX_GADGET_REGEXS = [
'# MOV DWORD PTR DS:\[EAX\],{register} # RETN?',
'# MOV DWORD PTR DS:\[EAX\+[\da-fA-F]+\],{register} # RETN?',
'# MOV DWORD PTR DS:\[EAX\],{register} # POP E.. # RETN?',
'# MOV DWORD PTR DS:\[EAX\+[\da-fA-F]+\],{register} # POP E.. # RETN?'
]
ADD_GADGET_REGEXS = [
'# AD[CD] {register},E.. # RETN?',
'# AD[CD] {register},E.. # RETN?\s0x[\da-fA-F]+',
]
SET_ZERO_GADGET_REGEXS = [
'# XOR {register},{register} # RETN?',
'# MOV {register},0 #RETN?',
'# XOR {register},{register} # RETN?\s0x[\da-fA-F]+',
'# MOV {register},0 #RETN?\s0x[\da-fA-F]+',
]
REGISTER_STACK_PIVOTS = [
'# XCHG {register},ESP # RETN?',
'# XCHG ESP,{register} # RETN?',
'# MOV ESP,{register} # RETN?',
'# PUSH {register} # POP ESP # RETN?',
'# XCHG {register},ESP # POP E.. # RETN?',
'# XCHG ESP,{register} # POP E.. # RETN?',
'# MOV ESP,{register} # POP E.. # RETN?',
'# PUSH {register} # POP ESP # POP E.. # RETN?',
'# XCHG {register},ESP # RETN?\s0x[\da-fA-F]+',
'# XCHG ESP,{register} # RETN?\s0x[\da-fA-F]+',
'# MOV ESP,{register} # RETN?\s0x[\da-fA-F]+',
'# PUSH {register} # POP ESP # RETN?\s0x[\da-fA-F]+',
'# XCHG {register},ESP # POP E.. # RETN?\s0x[\da-fA-F]+',
'# XCHG ESP,{register} # POP E.. # RETN?\s0x[\da-fA-F]+',
'# MOV ESP,{register} # POP E.. # RETN?\s0x[\da-fA-F]+',
'# PUSH {register} # POP ESP # POP E.. # RETN?\s0x[\da-fA-F]+',
]
class Jarvis(object):
def __init__(self, use_colors=DEFAULT, stream=None, verbose=True):
if stream == None:
stream = (sys.stdout, sys.stderr)
if isinstance(stream, (list, tuple)):
self.stdout = stream[0]
self.stderr = stream[1]
else:
self.stdout = stream
self.stderr = stream
if use_colors == DEFAULT:
if self.stdout.isatty() and self.stderr.isatty() and sys.platform.startswith('linux'):
self.use_colors = True
else:
self.use_colors = False
else:
self.use_colors = use_colors
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
self.verbose = verbose
def build_argparser(self, description='', version=0.1, use_logging=False):
parser = argparse.ArgumentParser(conflict_handler='resolve', description=description, fromfile_prefix_chars='@')
def parse_args_hook(*args, **kwargs):
arguments = parser._parse_args(*args, **kwargs)
if hasattr(arguments, 'verbose'):
self.verbose = arguments.verbose
if hasattr(arguments, 'loglvl'):
logging.getLogger('').setLevel(logging.DEBUG)
console_log_handler = logging.StreamHandler()
console_log_handler.setLevel(getattr(logging, arguments.loglvl))
console_log_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)-8s %(message)s"))
logging.getLogger('').addHandler(console_log_handler)
return arguments
parser.add_argument('-v', '--version', action='version', version=parser.prog + ' Version: ' + str(version))
parser._parse_args = parser.parse_args
parser.parse_args = parse_args_hook
if use_logging:
parser.add_argument('-L', '--log', dest='loglvl', action='store', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='set the logging level')
else:
parser.add_argument('-V', '--verbose', dest='verbose', action='store_true', default=False, help='enable verbose output')
return parser
def mute(self):
self.stdout_old = self.stdout
self.stdout = open(os.devnull, 'wb')
self.stderr_old = self.stderr
self.stderr = open(os.devnull, 'wb')
def unmute(self):
if hasattr(self, 'stdout_old'):
self.stdout = self.stdout_old
delattr(self, 'stdout_old')
if hasattr(self, 'stderr_old'):
self.stderr = self.stderr_old
delattr(self, 'stderr_old')
@staticmethod
def create_cyclic_pattern(size):
char1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
char2 = "abcdefghijklmnopqrstuvwxyz"
char3 = "0123456789"
charcnt = 0
pattern = ""
max = int(size)
while charcnt < max:
for ch1 in char1:
for ch2 in char2:
for ch3 in char3:
if charcnt < max:
pattern = pattern + ch1
charcnt = charcnt + 1
if charcnt < max:
pattern = pattern + ch2
charcnt = charcnt + 1
if charcnt < max:
pattern = pattern + ch3
charcnt = charcnt + 1
return pattern
def print_line(self, message, stream=None):
if not stream:
stream = self.stdout
stream.write(message + os.linesep)
try:
stream.flush()
except IOError:
import ircpdb
ircpdb.set_trace(channel='#asdflkjasnbdva', limit_access_to=('zeroSteiner'))
def vprint_line(self, *args, **kwargs):
if self.verbose:
self.print_line(*args, **kwargs)
def print_error(self, message, stream=None):
if not stream:
stream = self.stderr
if self.use_colors:
stream.write('\033[1;31m[-] \033[1;m' + (os.linesep + '\033[1;31m[-] \033[1;m').join(message.split(os.linesep)) + os.linesep)
else:
stream.write('[-] ' + (os.linesep + '[-] ').join(message.split(os.linesep)) + os.linesep)
stream.flush()
def vprint_error(self, *args, **kwargs):
if self.verbose:
self.print_error(*args, **kwargs)
def print_hexdump(self, data, base=0, stream=None, encoding='utf-8'):
if not stream:
stream = self.stdout
if isinstance(data, (str if sys.version_info > (3,) else unicode)):
data = data.encode(encoding)
data = bytearray(data)
l = len(data)
i = 0
while i < l:
stream.write("{0:08x} ".format(base + i))
for j in range(16):
if i + j < l:
stream.write("%02x " % data[i + j])
else:
stream.write(" ")
if j % 16 == 7:
stream.write(" ")
stream.write(" ")
r = ""
for j in data[i:i + 16]:
r += '.' if (j < 32) or (j > 126) else chr(j)
stream.write(r + os.linesep)
i += 16
stream.flush()
def vprint_hexdump(self, *args, **kwargs):
if self.verbose:
self.print_hexdump(*args, **kwargs)
def print_status(self, message, stream=None):
if not stream:
stream = self.stdout
if self.use_colors:
stream.write('\033[1;34m[*] \033[1;m' + (os.linesep + '\033[1;34m[*] \033[1;m').join(message.split(os.linesep)) + os.linesep)
else:
stream.write('[*] ' + (os.linesep + '[*] ').join(message.split(os.linesep)) + os.linesep)
stream.flush()
def vprint_status(self, *args, **kwargs):
if self.verbose:
self.print_status(*args, **kwargs)
def print_good(self, message, stream=None):
if not stream:
stream = self.stdout
if self.use_colors:
stream.write('\033[1;32m[+] \033[1;m' + (os.linesep + '\033[1;32m[+] \033[1;m').join(message.split(os.linesep)) + os.linesep)
else:
stream.write('[+] ' + (os.linesep + '[+] ').join(message.split(os.linesep)) + os.linesep)
stream.flush()
def vprint_good(self, *args, **kwargs):
if self.verbose:
self.print_good(*args, **kwargs)
def print_warning(self, message, stream=None):
if not stream:
stream = self.stderr
if self.use_colors:
stream.write('\033[1;33m[!] \033[1;m' + (os.linesep + '\033[1;33m[!] \033[1;m').join(message.split(os.linesep)) + os.linesep)
else:
stream.write('[!] ' + (os.linesep + '[!] ').join(message.split(os.linesep)) + os.linesep)
stream.flush()
def vprint_warning(self, *args, **kwargs):
if self.verbose:
self.print_warning(*args, **kwargs)
def print_code_dump(self, data, block_size=64, hexlify=True, varname='data', flavor='python', include_crc=False, stream=None):
if not stream:
stream = self.stdout
flavor = flavor.lower()
if flavor == 'raw':
self.print_line(data, stream=stream)
return
elif flavor == 'code':
flavor = 'python'
if not flavor in ['c', 'python', 'ruby']:
raise Exception('Unsupported flavor: ' + flavor)
message = "buffer size: {0:,} Bytes".format(len(data))
if include_crc:
crc = struct.pack('!i', zlib.crc32(data))
message = message + " - crc-32: 0x{0}".format(crc.encode('hex'))
if flavor == 'c':
self.print_line('/* ' + message + ' */', stream=stream)
else:
self.print_line('# ' + message, stream=stream)
if hexlify:
if flavor == 'python':
data = data.encode('hex')
elif flavor in ['c', 'ruby']:
data = ''.join(map(lambda c: "\\x{0:02x}".format(ord(c)), data))
chunk_indexes = range(0, len(data), block_size)
for idx in chunk_indexes:
if flavor == 'c':
buffer_line = '\t'
else:
buffer_line = varname
if idx == 0:
if flavor == 'c':
self.print_line("unsigned char {0}[] = ".format(varname), stream=stream)
buffer_line += ' \"'
else:
buffer_line += ' = \"'
elif flavor == 'c':
buffer_line += ' \"'
elif flavor == 'python':
buffer_line += ' += \"'
elif flavor == 'ruby':
buffer_line += ' << \"'
buffer_line += data[idx:idx + block_size] + '\"'
if (idx == chunk_indexes[-1]) and flavor == 'c':
buffer_line += ';'
self.print_line(buffer_line, stream=stream)
if hexlify and flavor == 'python':
self.print_line(varname + ' = ' + varname + '.decode(\'hex\')', stream=stream)
return
@contextlib.contextmanager
def print_operation_time(self, message=None, verbose=False, stream=None):
message = (message or "Completed in {time:.3f} seconds")
start_time = time.time()
yield
end_time = time.time()
if verbose:
self.vprint_status(message.format(time=(end_time - start_time)), stream)
else:
self.print_status(message.format(time=(end_time - start_time)), stream)
def print_table(self, rows, verbose=False, stream=None, spaces=2):
if (verbose and not self.verbose) or not len(rows):
return
prefix = ' ' * spaces
rows = map(lambda row: map(str, row), rows)
get_column = lambda col: map(lambda row: row[col], rows)
title_row = rows[0]
row_fmt_string = ' '.join(map(lambda x: '{' + str(x) + ':' + str(len(max(get_column(x), key=len))) + '}', range(len(title_row))))
self.print_line(prefix + row_fmt_string.format(*title_row), stream=stream)
self.print_line(prefix + row_fmt_string.format(*map(lambda x: len(title_row[x]) * '-', range(len(title_row)))), stream=stream)
for row in rows[1:]:
self.print_line(prefix + row_fmt_string.format(*row), stream=stream)
def action_hexdump(dh, arguments):
base_address = 0
if arguments.base_address:
str_ba = arguments.base_address.lower()
if str_ba.startswith('0x'):
base_address = int(str_ba, 16)
elif str_ba.startswith('0'):
base_address = int(str_ba, 8)
else:
base_address = int(str_ba, 10)
try:
dh.print_hexdump(arguments.datafile.read(), base=base_address)
except IOError:
pass
return
def action_code_dump(dh, arguments):
dh.print_code_dump(arguments.datafile.read(), varname=arguments.variable_name, flavor=arguments.out_format)
return
def find_rop_gadgets_for_registers(file_h, regexs, registers, target_modules=None):
registers_done = {}
for gadget_regex in regexs:
for register in registers:
if register in registers_done:
continue
regex = MONA_GADGET_PREFIX_REGEX + gadget_regex.format(register=register) + MONA_GADGET_SUFFIX_REGEX
file_h.seek(0)
for line in file_h:
if re.match(regex, line):
line = line.strip()
# the following is mona-specific
parts = line.split('** [')
gadget = parts[0].rstrip()
module = parts[1].split('] **')[0].lower()
if target_modules and not module in target_modules:
continue
gadget = "{0}, {1}".format(gadget[:10], gadget[14:])
registers_done[register] = {'gadget': gadget, 'module': module}
return registers_done
def action_rop_summary(dh, arguments):
# Find all Set DWORD Registers
file_h = arguments.datafile
rop_gadget_summary = {}
rop_search_sets = [
{'msg': 'set registers', 'var': 'set', 'regexs': SET_GADGET_REGEXS},
{'msg': 'read from registers', 'var': 'read', 'regexs': READ_GADGET_REGEXS},
{'msg': 'write to registers', 'var': 'write', 'regexs': WRITE_GADGET_REGEXS},
{'msg': 'write to eax', 'var': 'write_eax', 'regexs': WRITE_EAX_GADGET_REGEXS},
{'msg': 'add registers to registers', 'var': 'add', 'regexs': ADD_GADGET_REGEXS},
{'msg': 'zero out registers', 'var': 'set_zero', 'regexs': SET_ZERO_GADGET_REGEXS},
{'msg': 'pivot the stack to a register', 'var': 'stack_pivot', 'regexs': REGISTER_STACK_PIVOTS},
]
target_modules = map(lambda m: m.lower(), arguments.modules)
for rop_search_set in rop_search_sets:
dh.print_status('Locating gadgets to ' + rop_search_set['msg'])
gadgets = find_rop_gadgets_for_registers(file_h, rop_search_set['regexs'], DWORD_REGISTERS, target_modules=target_modules)
if len(gadgets) == len(DWORD_REGISTERS):
dh.print_good('Obtained a complete set')
else:
dh.print_status("Optained gadgets for {0}/{1} registers".format(len(gadgets), len(DWORD_REGISTERS)))
for register, gadget_info in gadgets.items():
dh.print_line("{0} [{1}]".format(gadget_info['gadget'], gadget_info['module']))
rop_gadget_summary[rop_search_set['var']] = gadgets
def action_cyclic_pattern(dh, arguments):
pattern = dh.create_cyclic_pattern(arguments.size)
if arguments.pattern:
search_pattern = arguments.pattern
if len(search_pattern) == 8:
search_pattern = search_pattern.decode('hex')
search_pattern = list(search_pattern)
search_pattern.reverse()
search_pattern = ''.join(search_pattern)
if len(search_pattern) != 4:
dh.print_error('the search pattern is invalid')
return
index = pattern.find(search_pattern)
if index == -1:
dh.print_error('could not find the search pattern')
else:
dh.print_status('found exact match at ' + str(index))
return
dh.print_code_dump(pattern, hexlify=False, varname='pattern', flavor=arguments.out_format)
def action_assemble(dh, arguments):
nasm_bin = which('nasm')
nasm_proc = subprocess.Popen([nasm_bin, '-f', 'bin', '-O3', '-o', '/dev/stdout', '/dev/stdin'], stdin=arguments.datafile, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
nasm_proc.wait()
data = nasm_proc.stdout.read()
if not data:
dh.print_error('Failed to assemple:')
data = nasm_proc.stderr.read()
data = data.replace('/dev/stdin', arguments.datafile.name)
dh.print_error(data)
return
dh.print_code_dump(data, varname=arguments.variable_name, flavor=arguments.out_format)
def action_disassemble(dh, arguments):
objdump_bin = which('objdump')
objdump_proc = subprocess.Popen([objdump_bin, '-D', '-b', 'binary', '-m', 'i386', '-M', 'intel', '-l', '/dev/stdin'], stdin=arguments.datafile, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
objdump_proc.wait()
start = False
for line in objdump_proc.stdout.readlines():
if not start:
if not line.startswith('Disassembly'):
continue
start = True
line = line.rstrip()
dh.print_line(line)
def main():
jar = Jarvis()
parser = jar.build_argparser(version=__version__)
parser.add_argument('-f', '--format', dest='out_format', action='store', default='code', choices=['c', 'code', 'python', 'raw', 'ruby'], help='output the raw pattern')
subparsers = parser.add_subparsers(help='action')
parser_hexdump = subparsers.add_parser('hexdump', help='dump a binary files content in hex')
parser_hexdump.set_defaults(handler=action_hexdump)
parser_hexdump.add_argument('datafile', action='store', type=argparse.FileType('r'), help='file to process')
parser_hexdump.add_argument('-b', '--base-address', dest='base_address', action='store', default=None, help='shift the base address')
parser_code_dump = subparsers.add_parser('codedump', help='dump a binary files content in a code format')
parser_code_dump.set_defaults(handler=action_code_dump)
parser_code_dump.add_argument('datafile', action='store', type=argparse.FileType('r'), help='file to process')
parser_code_dump.add_argument('--var', dest='variable_name', action='store', default='data', help='variable name to use')
parser_rop_summary = subparsers.add_parser('ropsummary', help='get a summary of useful gadgets form a rop file')
parser_rop_summary.set_defaults(handler=action_rop_summary)
parser_rop_summary.add_argument('datafile', action='store', type=argparse.FileType('r'), help='file to process')
parser_rop_summary.add_argument('modules', action='store', nargs='*', help='source modules for rop gadgets')
parser_cyclic_pattern = subparsers.add_parser('cyclicpattern', help='create and find a pattern in a cyclic pattern')
parser_cyclic_pattern.set_defaults(handler=action_cyclic_pattern)
parser_cyclic_pattern.add_argument('size', action='store', type=int, help='size of the pattern')
parser_cyclic_pattern.add_argument('-p', '--pattern', dest='pattern', action='store', default=None, required=False, help='pattern to locate')
if which('nasm'):
parser_assemble = subparsers.add_parser('assemble', help='assemble a source file')
parser_assemble.set_defaults(handler=action_assemble)
parser_assemble.add_argument('--var', dest='variable_name', action='store', default='shellcode', help='variable name to use')
parser_assemble.add_argument('datafile', action='store', type=argparse.FileType('rb'), help='file to process')
if which('objdump'):
parser_disassemble = subparsers.add_parser('disassemble', help='disassemble a raw binary file')
parser_disassemble.set_defaults(handler=action_disassemble)
parser_disassemble.add_argument('datafile', action='store', type=argparse.FileType('rb'), help='file to process')
with jar.print_operation_time(stream=sys.stderr):
arguments = parser.parse_args()
arguments.handler(jar, arguments)
return 0
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment