Skip to content

Instantly share code, notes, and snippets.

@cetfor
Last active October 29, 2020 14:18
Show Gist options
  • Save cetfor/d6fd6939d7333c4cd6ebeee3f19b2225 to your computer and use it in GitHub Desktop.
Save cetfor/d6fd6939d7333c4cd6ebeee3f19b2225 to your computer and use it in GitHub Desktop.
Binary Ninja Update Analysis Test
import argparse
import binaryninja
import sys
import tempfile
import time
import math
import os
from multiprocessing import Pool, TimeoutError, cpu_count
"""
Example running this on Ubuntu 20.04 LTS (same result in Python 2 and Python 3)
user@ubuntu:~/Desktop/binjatest$ python3 test.py target_binary --update-analysis
Spawning 3 worker processes.
`update_analysis` is set to: True
Hello from worker!
Hello from worker!
Hello from worker!
<hangs forever>
user@ubuntu:~/Desktop/binjatest$ python3 test.py target_binary
Spawning 3 worker processes.
`update_analysis` is set to: False
Hello from worker!
Hello from worker!
Hello from worker!
[Task 2] Address for function '__libc_csu_fini': 0x400840
[Task 2] Address for function '_fini': 0x400844
[Task 1] Address for function '__do_global_dtors_aux': 0x4006d0
[Task 1] Address for function 'frame_dummy': 0x400710
[Task 1] Address for function 'main': 0x40071a
[Task 1] Address for function '__libc_csu_init': 0x4007d0
[Task 0] Address for function '_init': 0x400590
[Task 0] Address for function '_start': 0x400610
[Task 0] Address for function 'deregister_tm_clones': 0x400640
[Task 0] Address for function 'register_tm_clones': 0x400680
"""
def worker(task_number, function_address_chunk, temp_file_name, update_analysis):
print("Hello from worker!")
bv = binaryninja.BinaryViewType.get_view_of_file_with_options(temp_file_name, update_analysis=update_analysis)
for address in function_address_chunk:
function = bv.get_function_at(address)
print("[Task {}] Address for function '{}': {}".format(task_number, function.name, hex(address)))
# do whatever analysis you need here...
def main(target, update_analysis):
number_of_cores = cpu_count()
number_of_processes = number_of_cores - 1
if number_of_processes <= 0:
print("This test requires at least 2 cores, cpu_count returned: {}".format(number_of_cores))
sys.exit()
# update_analysis is `True` by default
bv = binaryninja.BinaryViewType.get_view_of_file_with_options(target, update_analysis=update_analysis)
functions = bv.functions
# save this .bndb file so workers can load it and continue analysis on their own tasks...
temp_file_name = os.path.join(tempfile.gettempdir(), next(tempfile._get_candidate_names()))
if not bv.save(temp_file_name):
print("Error: Unable to save temporary bndb file ('{}') to disk.".format(temp_file_name))
return
# split all functions over a set of function "chunks" the workers will process
# we use function names because you cannot Pickle binary ninja function objects since
# they contain ctype objects with pointers. instead, we get a list of function names each
# worker should handle, then have the workers open our saved .bndb file (`temp_file_name`)
# and get their own Function objects to process.
funcs_per_worker = int(math.ceil(len(functions) / number_of_processes))
functions_by_address = [function.start for function in functions]
function_address_chunks = [functions_by_address[i:i+funcs_per_worker] for i in range(0, len(functions_by_address), funcs_per_worker)]
print("Spawning {} worker processes.".format(number_of_processes))
print("`update_analysis` is set to: {}".format(update_analysis))
pool = Pool(processes=number_of_processes)
results = [pool.apply_async(worker, (i, function_address_chunk, temp_file_name, update_analysis,)) for i, function_address_chunk in enumerate(function_address_chunks)]
output = [result.get() for result in results]
# delete our temp file now that we're done with it
os.remove(temp_file_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Binary Ninja Update Analysis Pool Test')
parser.add_argument(
'target',
action='store',
type=str,
help='/path/to/<ELF, Mach-O, PE, or .bndb>')
parser.add_argument(
'--update-analysis',
action='store_true',
help='Whether or not to update analysis on BinaryView.')
args = parser.parse_args()
if not os.path.exists(args.target):
print("Error: Unable to locate target '{}'".format(args.target))
sys.exit()
elif not os.path.isfile(args.target):
print("Error: Target is not a valid file ('{}')".format(args.target))
sys.exit()
update_analysis = False
if args.update_analysis:
update_analysis = True
main(args.target, update_analysis)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment