Skip to content

Instantly share code, notes, and snippets.

@pedramamini
Last active July 11, 2022 21:10
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pedramamini/4486421 to your computer and use it in GitHub Desktop.
Save pedramamini/4486421 to your computer and use it in GitHub Desktop.
# Joe Sandbox API wrapper.
# REQUIRES: python-requests http://docs.python-requests.org/en/latest/
import sys
import time
import random
import getpass
import requests
try:
import simplejson as json
except ImportError:
import json
# API URL.
API_URL = "https://joesandbox.joesecurity.org/index.php/api/"
# credentials.
JOE_USER = ""
JOE_PASS = ""
JOE_TAC = False
########################################################################################################################
class joe_api:
"""
Joe Sandbox API wrapper.
@see: https://joesandbox.joesecurity.org/index.php/download/Joe%20Sandbox%20WEB%20API.pdf
"""
####################################################################################################################
def __init__ (self, username, password, verify_ssl=False):
"""
Initialize the interface to Joe Sandbox API with username and password.
"""
self.username = username
self.password = password
self.verify_ssl = verify_ssl
####################################################################################################################
def __API (self, api, params=None, files=None):
"""
Robustness wrapper. Tries up to 3 times to dance with the Joe Sandbox API.
@type api: str
@param api: API to call.
@type params: dict
@param params: Optional parameters for API.
@type files: dict
@param files: Optional dictionary of files for multipart post.
@rtype: requests.response.
@return: Response object.
@raise Exception: If all attempts failed.
"""
# default query parameters.
default_params = \
{
"username" : self.username,
"password" : self.password,
}
# interpolate default and supplied API params.
if params is None:
params = default_params
else:
params.update(default_params)
# make up to three attempts to dance with the API, use a jittered exponential back-off delay
for i in xrange(3):
try:
return requests.post(API_URL + api, data=params, files=files, verify=self.verify_ssl)
# 0.4, 1.6, 6.4, 25.6, ...
except:
time.sleep(random.uniform(0, 4 ** i * 100 / 1000.0))
raise Exception("exceeded 3 attempts with Joe sandbox API.")
####################################################################################################################
def analyses (self):
"""
Retrieve a list of analyzed samples.
@rtype: list
@return: List of objects referencing each analyzed file.
"""
# dance with the API.
response = self.__API("analysis/list")
# returns a list, we assign it into a dictionary.
content = '{"analyses":' + response.content + '}'
# parse the JSON into a container and return the systems list we just created above.
return json.loads(content)["analyses"]
####################################################################################################################
def analyze (self, handle, systems="xp", inet=True, scae=False, comments=""):
"""
Submit a file for analysis.
@type handle: File handle
@param handle: Handle to file to upload for analysis.
@type systems: str
@param systems: Comma separated list of systems to run sample on.
@type inet: bool
@param inet: Raise this flag to allow Internet connectivity for sample.
@type scae: bool
@param scae: Raise this flag to enable Static Code Analysis Engine.
@type comments: str
@param comments: Comments to store with sample entry.
@rtype: dict
@return: Dictionary of system identifier and associated webid.
"""
if inet:
inet = "1"
else:
inet = "0"
if scae:
scae = "1"
else:
scae = "0"
# query parameters.
params = \
{
"tandc" : "1",
"inet" : inet,
"scae" : scae,
"type" : "file",
"comments" : comments,
}
# multipart post files.
files = { "sample" : (handle.name, handle) }
# keep a list of webids per system we send the sample to.
webids = {}
# a separate API request must be made per system we wish to analyze on.
for system in systems.split(","):
# ensure the handle is at offset 0.
handle.seek(0)
# set the system and dance with the API.
params[system] = "1"
response = self.__API("analysis", params, files)
print response.content
# save the webid for the current system.
try:
webids[system] = json.loads(response.content)["webid"]
except:
webids[system] = "FAIL"
# clear out the current system.
del params[system]
# return the webid list.
return webids
####################################################################################################################
def is_available (self):
"""
Determine if the Joe Sandbox API servers are alive or in maintenance mode.
@rtype: bool
@return: True if service is available, False otherwise.
"""
# dance with the API.
response = self.__API("server/available")
try:
if response.content == "1":
return True
except:
pass
return False
####################################################################################################################
def delete (self, webid):
"""
Delete the reports associated with the given webid.
@type webid: int
@param webid: Report ID to delete.
@rtype: bool
@return: True on success, False otherwise.
"""
# dance with the API.
response = self.__API("analysis/delete", {"webid" : webid})
try:
if response.content == "1":
return True
except:
pass
return False
####################################################################################################################
def queue_size (self):
"""
Determine Joe sandbox queue length.
@rtype: int
@return: Number of submissions in sandbox queue.
"""
# dance with the API.
response = self.__API("queue/size")
return int(response.content)
####################################################################################################################
def report (self, webid, resource="json", run=0):
"""
Retrieves the specified report for the analyzed item, referenced by webid. Available resource types include:
html, xml, json, bins, pcap, shoots, memstrings, binstrings, memdumps, sample, and cookbook.
@type webid: int
@param webid: Report ID to draw from.
@type resource: str
@param resource: Resource type.
@type run: int
@param run: Index into list of supplied systems to retrieve report from.
@rtype: dict
@return: Dictionary representing the JSON parsed data or raw, for other formats / JSON parsing failure.
"""
resource = resource.lower()
params = \
{
"webid" : webid,
"type" : resource,
"run" : run,
}
# dance with the API.
response = self.__API("analysis/download", params)
# if resource is JSON, convert to container and return the head reference "analysis".
if resource == "json":
try:
return json.loads(response.content)["analysis"]
except:
None
# otherwise, return the raw content.
return response.content
####################################################################################################################
def search (self, query):
"""
Retrieve a list of available systems.
@type query: str
@param query: Search query.
@rtype: list
@return: List of objects describing available analysis systems.
"""
# dance with the API.
response = self.__API("analysis/search", {"regex" : "/" + query + "/"})
# returns a list, we assign it into a dictionary.
content = '{"results":' + response.content + '}'
# parse the JSON into a container and return the result list we just created above.
return json.loads(content)["results"]
####################################################################################################################
def systems (self):
"""
Retrieve a list of available systems.
@rtype: list
@return: List of objects describing available analysis systems.
"""
# dance with the API.
response = self.__API("server/systems")
# returns a list, we assign it into a dictionary.
content = '{"systems":' + response.content + '}'
# parse the JSON into a container and return the systems list we just created above.
return json.loads(content)["systems"]
########################################################################################################################
if __name__ == "__main__":
def USAGE ():
msg = "%s: <analyses | analyze <fh> | available | delete <id> | queue | report <id> | search <term> | systems>"
print msg % sys.argv[0]
sys.exit(1)
if len(sys.argv) == 2:
cmd = sys.argv.pop().lower()
arg = None
elif len(sys.argv) == 3:
arg = sys.argv.pop().lower()
cmd = sys.argv.pop().lower()
else:
USAGE()
# read in username, password and aggree to terms and conditions.
if JOE_USER:
username = JOE_USER
else:
username = raw_input("Joe Sandbox Username: ")
if JOE_PASS:
password = JOE_PASS
else:
password = getpass.getpass("Joe Sandbox Password: ")
if JOE_TAC:
tac = "yes"
else:
tac = raw_input("Do you agree to the terms and conditions (yes/no)? ")
if tac.lower() != "yes":
sys.exit(1)
# instantiate Joe Sandbox API interface.
joe = joe_api(username, password)
# process command line arguments.
if "analyses" in cmd:
for a in joe.analyses():
print a["webid"], a["status"], a["systems"], a["filename"], a["detections"]
elif "analyze" in cmd:
if arg is None:
USAGE()
else:
with open(arg, "rb") as handle:
print joe.analyze(handle, systems="xp,w7")
elif "available" in cmd:
print joe.is_available()
elif "delete" in cmd:
if arg is None:
USAGE()
else:
print joe.delete(arg)
elif "queue" in cmd:
print joe.queue_size()
elif "report" in cmd:
if arg is None:
USAGE()
else:
print joe.report(arg)
elif "search" in cmd:
if arg is None:
USAGE()
else:
for r in joe.search(arg):
print r
elif "system" in cmd:
print ",".join([s["name"] for s in joe.systems()])
else:
USAGE()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment