Last active
October 29, 2020 14:18
-
-
Save cetfor/d6fd6939d7333c4cd6ebeee3f19b2225 to your computer and use it in GitHub Desktop.
Binary Ninja Update Analysis Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import binaryninja | |
import sys | |
import tempfile | |
import time | |
import math | |
import os | |
from multiprocessing import Pool, TimeoutError, cpu_count | |
""" | |
Example running this on Ubuntu 20.04 LTS (same result in Python 2 and Python 3) | |
user@ubuntu:~/Desktop/binjatest$ python3 test.py target_binary --update-analysis | |
Spawning 3 worker processes. | |
`update_analysis` is set to: True | |
Hello from worker! | |
Hello from worker! | |
Hello from worker! | |
<hangs forever> | |
user@ubuntu:~/Desktop/binjatest$ python3 test.py target_binary | |
Spawning 3 worker processes. | |
`update_analysis` is set to: False | |
Hello from worker! | |
Hello from worker! | |
Hello from worker! | |
[Task 2] Address for function '__libc_csu_fini': 0x400840 | |
[Task 2] Address for function '_fini': 0x400844 | |
[Task 1] Address for function '__do_global_dtors_aux': 0x4006d0 | |
[Task 1] Address for function 'frame_dummy': 0x400710 | |
[Task 1] Address for function 'main': 0x40071a | |
[Task 1] Address for function '__libc_csu_init': 0x4007d0 | |
[Task 0] Address for function '_init': 0x400590 | |
[Task 0] Address for function '_start': 0x400610 | |
[Task 0] Address for function 'deregister_tm_clones': 0x400640 | |
[Task 0] Address for function 'register_tm_clones': 0x400680 | |
""" | |
def worker(task_number, function_address_chunk, temp_file_name, update_analysis): | |
print("Hello from worker!") | |
bv = binaryninja.BinaryViewType.get_view_of_file_with_options(temp_file_name, update_analysis=update_analysis) | |
for address in function_address_chunk: | |
function = bv.get_function_at(address) | |
print("[Task {}] Address for function '{}': {}".format(task_number, function.name, hex(address))) | |
# do whatever analysis you need here... | |
def main(target, update_analysis): | |
number_of_cores = cpu_count() | |
number_of_processes = number_of_cores - 1 | |
if number_of_processes <= 0: | |
print("This test requires at least 2 cores, cpu_count returned: {}".format(number_of_cores)) | |
sys.exit() | |
# update_analysis is `True` by default | |
bv = binaryninja.BinaryViewType.get_view_of_file_with_options(target, update_analysis=update_analysis) | |
functions = bv.functions | |
# save this .bndb file so workers can load it and continue analysis on their own tasks... | |
temp_file_name = os.path.join(tempfile.gettempdir(), next(tempfile._get_candidate_names())) | |
if not bv.save(temp_file_name): | |
print("Error: Unable to save temporary bndb file ('{}') to disk.".format(temp_file_name)) | |
return | |
# split all functions over a set of function "chunks" the workers will process | |
# we use function names because you cannot Pickle binary ninja function objects since | |
# they contain ctype objects with pointers. instead, we get a list of function names each | |
# worker should handle, then have the workers open our saved .bndb file (`temp_file_name`) | |
# and get their own Function objects to process. | |
funcs_per_worker = int(math.ceil(len(functions) / number_of_processes)) | |
functions_by_address = [function.start for function in functions] | |
function_address_chunks = [functions_by_address[i:i+funcs_per_worker] for i in range(0, len(functions_by_address), funcs_per_worker)] | |
print("Spawning {} worker processes.".format(number_of_processes)) | |
print("`update_analysis` is set to: {}".format(update_analysis)) | |
pool = Pool(processes=number_of_processes) | |
results = [pool.apply_async(worker, (i, function_address_chunk, temp_file_name, update_analysis,)) for i, function_address_chunk in enumerate(function_address_chunks)] | |
output = [result.get() for result in results] | |
# delete our temp file now that we're done with it | |
os.remove(temp_file_name) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Binary Ninja Update Analysis Pool Test') | |
parser.add_argument( | |
'target', | |
action='store', | |
type=str, | |
help='/path/to/<ELF, Mach-O, PE, or .bndb>') | |
parser.add_argument( | |
'--update-analysis', | |
action='store_true', | |
help='Whether or not to update analysis on BinaryView.') | |
args = parser.parse_args() | |
if not os.path.exists(args.target): | |
print("Error: Unable to locate target '{}'".format(args.target)) | |
sys.exit() | |
elif not os.path.isfile(args.target): | |
print("Error: Target is not a valid file ('{}')".format(args.target)) | |
sys.exit() | |
update_analysis = False | |
if args.update_analysis: | |
update_analysis = True | |
main(args.target, update_analysis) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment