Skip to content

Instantly share code, notes, and snippets.

@hugsy
Created February 12, 2024 22:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hugsy/714e0038d5d0b1deb7fad1907928252f to your computer and use it in GitHub Desktop.
Save hugsy/714e0038d5d0b1deb7fad1907928252f to your computer and use it in GitHub Desktop.
Run Binary Ninja headlessly using RPyC
import binaryninja
import threading
import typing
import logging
import rpyc
import rpyc.utils.helpers
import rpyc.utils.server
if typing.TYPE_CHECKING:
import rpyc.core.protocol
SERVICE_NAME: str = "Binja-RPyC"
DEBUG: bool = False
HOST: str = "0.0.0.0"
PORT: int = 18812
PAGE_SIZE: int = 0x1000
g_ServiceThread = None
g_Server = None
__bv = None
class BinjaRpycService(rpyc.Service):
ALIASES = ["binja", ]
def __init__(self, bv):
self.bv = bv
return
def on_connect(self, conn: rpyc.core.protocol.Connection):
logging.info("connect open: {}".format(conn,))
return
def on_disconnect(self, conn: rpyc.core.protocol.Connection):
print("connection closed: {}".format(conn,))
return
exposed_binaryninja = binaryninja
def exposed_bv(self):
return self.bv
def exposed_eval(self, cmd):
return eval(cmd)
def is_service_started():
global g_ServiceThread
return g_ServiceThread is not None
def start_service(host: str, port: int):
"""Starting the RPyC server"""
global g_Server, __bv
g_Server = None
__bv = bv
for i in range(1):
p: int = port + i
try:
service = rpyc.utils.helpers.classpartial(BinjaRpycService, bv)
g_Server = rpyc.utils.server.ThreadedServer(
service(),
hostname=host,
port=p,
protocol_config={'allow_public_attrs': True, }
)
break
except OSError as e:
logging.error(f"OSError: {str(e)}")
g_Server = None
if not g_Server:
logging.error("failed to start server...")
return
logging.info("server successfully started")
g_Server.start()
return
def rpyc_start():
global g_ServiceThread
logging.debug("Starting background service...")
g_ServiceThread = threading.Thread(
target=start_service, args=(HOST, PORT))
g_ServiceThread.daemon = True
g_ServiceThread.start()
binaryninja.show_message_box(
SERVICE_NAME,
"Service successfully started, you can use any RPyC client to connect to this instance of Binary Ninja",
binaryninja.MessageBoxButtonSet.OKButtonSet,
binaryninja.MessageBoxIcon.InformationIcon
)
return
def shutdown_service() -> bool:
if not g_Server:
logging.warning("Service not started")
return False
try:
logging.debug("Shutting down service")
g_Server.close()
except Exception as e:
logging.error(f"Exception: {str(e)}")
return False
return True
def stop_service() -> bool:
global g_ServiceThread
if not g_ServiceThread:
return False
logging.debug("Stopping service thread")
if shutdown_service():
g_ServiceThread.join()
g_ServiceThread = None
logging.info("Service thread stopped")
return True
def rpyc_stop(bv: binaryninja.BinaryView):
if stop_service():
binaryninja.show_message_box(
SERVICE_NAME,
"Service successfully stopped",
binaryninja.MessageBoxButtonSet.OKButtonSet,
binaryninja.MessageBoxIcon.InformationIcon
)
else:
binaryninja.show_message_box(
SERVICE_NAME,
"An error occured while stopping the service, check logs",
binaryninja.MessageBoxButtonSet.OKButtonSet,
binaryninja.MessageBoxIcon.ErrorIcon
)
return
if is_service_started():
logging.info("Trying to start the service")
rpyc_stop()
else:
logging.info("Trying to stop the service")
rpyc_start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment