Created
October 21, 2024 14:34
-
-
Save luca-m/ac1d604f2da6aace22afc36bc81a6295 to your computer and use it in GitHub Desktop.
Geppetto IDA-Pro script without dependencies and with gpt-4o
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import json | |
import idaapi | |
import ida_hexrays | |
import ida_kernwin | |
import idc | |
import os | |
import re | |
import textwrap | |
import threading | |
import requests | |
# ============================================================================= | |
# Setup the context menu and hotkey in IDA | |
# ============================================================================= | |
OPENAI_API_KEY="" | |
OPENAI_MODEL="gpt-4o-mini" | |
class GepettoPlugin(idaapi.plugin_t): | |
flags = 0 | |
explain_action_name = "gepetto:explain_function" | |
explain_menu_path = "Edit/Gepetto/Explain function" | |
rename_action_name = "gepetto:rename_function" | |
rename_menu_path = "Edit/Gepetto/Rename variables" | |
wanted_name = 'Gepetto' | |
wanted_hotkey = '' | |
comment = "Uses gpt-4 to enrich the decompiler's output" | |
help = "See usage instructions on GitHub" | |
menu = None | |
def init(self): | |
# Check whether the decompiler is available | |
if not ida_hexrays.init_hexrays_plugin(): | |
return idaapi.PLUGIN_SKIP | |
# Function explaining action | |
explain_action = idaapi.action_desc_t(self.explain_action_name, | |
'Explain function', | |
ExplainHandler(), | |
"Ctrl+Alt+G", | |
'Use gpt-4 to explain the currently selected function', | |
199) | |
idaapi.register_action(explain_action) | |
idaapi.attach_action_to_menu(self.explain_menu_path, self.explain_action_name, idaapi.SETMENU_APP) | |
# Variable renaming action | |
rename_action = idaapi.action_desc_t(self.rename_action_name, | |
'Rename variables', | |
RenameHandler(), | |
"Ctrl+Alt+R", | |
"Use gpt-4 to rename this function's variables", | |
199) | |
idaapi.register_action(rename_action) | |
idaapi.attach_action_to_menu(self.rename_menu_path, self.rename_action_name, idaapi.SETMENU_APP) | |
# Register context menu actions | |
self.menu = ContextMenuHooks() | |
self.menu.hook() | |
return idaapi.PLUGIN_KEEP | |
def run(self, arg): | |
pass | |
def term(self): | |
idaapi.detach_action_from_menu(self.explain_menu_path, self.explain_action_name) | |
idaapi.detach_action_from_menu(self.rename_menu_path, self.rename_action_name) | |
if self.menu: | |
self.menu.unhook() | |
return | |
# ----------------------------------------------------------------------------- | |
class ContextMenuHooks(idaapi.UI_Hooks): | |
def finish_populating_widget_popup(self, form, popup): | |
# Add actions to the context menu of the Pseudocode view | |
if idaapi.get_widget_type(form) == idaapi.BWN_PSEUDOCODE: | |
idaapi.attach_action_to_popup(form, popup, GepettoPlugin.explain_action_name, "Gepetto/") | |
idaapi.attach_action_to_popup(form, popup, GepettoPlugin.rename_action_name, "Gepetto/") | |
# ----------------------------------------------------------------------------- | |
def comment_callback(address, view, response): | |
""" | |
Callback that sets a comment at the given address. | |
:param address: The address of the function to comment | |
:param view: A handle to the decompiler window | |
:param response: The comment to add | |
""" | |
# Add newlines at the end of each sentence. | |
response = "\n".join(textwrap.wrap(response, 80, replace_whitespace=False)) | |
# Add the response as a comment in IDA. | |
idc.set_func_cmt(address, response, 0) | |
# Refresh the window so the comment is displayed properly | |
if view: | |
view.refresh_view(False) | |
print("Query to model finished!") | |
# ----------------------------------------------------------------------------- | |
class ExplainHandler(idaapi.action_handler_t): | |
""" | |
This handler is tasked with querying the model for an explanation of the | |
given function. Once the reply is received, it is added as a function | |
comment. | |
""" | |
def __init__(self): | |
idaapi.action_handler_t.__init__(self) | |
def activate(self, ctx): | |
decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea()) | |
v = ida_hexrays.get_widget_vdui(ctx.widget) | |
query_model_async("Can you explain what the following C function does and suggest a better name for it?\n" | |
+ str(decompiler_output), | |
functools.partial(comment_callback, address=idaapi.get_screen_ea(), view=v)) | |
return 1 | |
# This action is always available. | |
def update(self, ctx): | |
return idaapi.AST_ENABLE_ALWAYS | |
# ----------------------------------------------------------------------------- | |
def rename_callback(address, view, response): | |
""" | |
Callback that extracts a JSON array of old names and new names from the | |
response and sets them in the pseudocode. | |
:param address: The address of the function to work on | |
:param view: A handle to the decompiler window | |
:param response: The response from the model | |
""" | |
j = re.search(r"\{[^}]*?\}", response) | |
if not j: | |
print(f"Cannot extract valid JSON from the response. Asking the model to fix it...") | |
query_model_async("The JSON document provided in this response is invalid. Can you fix it?\n" + response, | |
functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=view)) | |
return | |
try: | |
names = json.loads(j.group(0)) | |
except json.decoder.JSONDecodeError: | |
print(f"The JSON document returned is invalid. Asking the model to fix it...") | |
query_model_async("Please fix the following JSON document:\n" + j.group(0), | |
functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=view)) | |
return | |
# The rename function needs the start address of the function | |
function_addr = idaapi.get_func(address).start_ea | |
replaced = [] | |
for n in names: | |
if ida_hexrays.rename_lvar(function_addr, n, names[n]): | |
replaced.append(n) | |
# Update possible names left in the function comment | |
comment = idc.get_func_cmt(address, 0) | |
if comment and len(replaced) > 0: | |
for n in replaced: | |
comment = re.sub(r'\b%s\b' % n, names[n], comment) | |
idc.set_func_cmt(address, comment, 0) | |
# Refresh the window to show the new names | |
if view: | |
view.refresh_view(True) | |
print(f"Query finished! {len(replaced)} variable(s) renamed.") | |
# ----------------------------------------------------------------------------- | |
class RenameHandler(idaapi.action_handler_t): | |
""" | |
This handler requests new variable names from the model and updates the | |
decompiler's output. | |
""" | |
def __init__(self): | |
idaapi.action_handler_t.__init__(self) | |
def activate(self, ctx): | |
decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea()) | |
v = ida_hexrays.get_widget_vdui(ctx.widget) | |
query_model_async("Analyze the following C function:\n" + str(decompiler_output) + | |
"\nSuggest better variable names, reply with a JSON array where keys are the original names" | |
"and values are the proposed names. Do not explain anything, only print the JSON " | |
"dictionary.", | |
functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=v)) | |
return 1 | |
# This action is always available. | |
def update(self, ctx): | |
return idaapi.AST_ENABLE_ALWAYS | |
# ============================================================================= | |
# Model interaction | |
# ============================================================================= | |
def query_model(query, cb, max_tokens=6500): | |
""" | |
Function which sends a query to the model API and calls a callback when the response is available. | |
Blocks until the response is received. | |
:param query: The request to send to the model API. | |
:param cb: The function to which the response will be passed. | |
""" | |
api_key = OPENAI_API_KEY # os.getenv("OPENAI_API_KEY") # Assume API key is stored as environment variable | |
if not api_key: | |
print("Error: API key not found.") | |
return | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
data = { | |
"model": OPENAI_MODEL, # Replace with the model you want to use | |
"messages": [ | |
#{"role": "system", "content": "You are a reverse engineer."}, # System role message | |
{"role": "user", "content": query} # User's query | |
], | |
#"prompt": query, | |
"max_tokens": max_tokens, | |
"temperature": 0.6, | |
"top_p": 1, | |
"frequency_penalty": 1, | |
"presence_penalty": 1, | |
} | |
try: | |
response = requests.post( | |
"https://api.openai.com/v1/chat/completions", # Adjust this to your API endpoint | |
headers=headers, | |
json=data, | |
timeout=60 | |
) | |
if response.status_code == 200: | |
response_json = response.json() | |
result = response_json['choices'][0]['message']['content'] # Correct extraction # Extract the completion text | |
ida_kernwin.execute_sync(functools.partial(cb, response=result), ida_kernwin.MFF_WRITE) | |
else: | |
print(f"API request failed with status code {response.status_code}: {response.text}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error during the API request: {e}") | |
# ----------------------------------------------------------------------------- | |
def query_model_async(query, cb): | |
""" | |
Function which sends a query to the model API asynchronously. | |
:param query: The request to send to the model API. | |
:param cb: The function to which the response will be passed. | |
""" | |
print("Request sent to model API asynchronously...") | |
t = threading.Thread(target=query_model, args=[query, cb]) | |
t.start() | |
# ============================================================================= | |
# Main | |
# ============================================================================= | |
def PLUGIN_ENTRY(): | |
return GepettoPlugin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment