Skip to content

Instantly share code, notes, and snippets.

@rc-abodkins
Last active July 5, 2021 18:06
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rc-abodkins/3afe5062c46100717a84039b5546d285 to your computer and use it in GitHub Desktop.
Save rc-abodkins/3afe5062c46100717a84039b5546d285 to your computer and use it in GitHub Desktop.
This script searches binaries within VMware Carbon Black EDR
import argparse
import os
import configparser
import csv
import sys
from os.path import exists
import requests
#Console Output coloring. Makes knowing if you have any errors/ warnings easier to identify
err_Col = '\033[91m'
success_Col = '\033[92m'
warn_Col = '\033[93m'
no_col = '\033[0m'
birb="""
_.----._
,'.::.--..:._
/::/_,-<o)::;_`-._
::::::::`-';'`,--`-`
;::;'|::::,','
,'::/ ;:::/, :.
/,':/ /::;' \ ':\\
:'.:: ,-'' . `.::\\
\.:;':. ` :: .:
(;' ;;; .::' :|
\,:;; \ `::.\.\\
`);' '::' `:
\. ` `' .: _,'
`.: .. -. ' :. :/ _.-' _.-
>;._.:._.;,-=_(.-' __ `._
,;' _..-(((('' .,-'' `-._
_,'<.-'' _..``'.'`-'`. `
_.-((((_..--'' \ \ `.`.
-' _.``' \ `
--------------------------------------------------
"""
binary_hunt_ascii="""
____ _ _ _ _ _ _ _ _
| __ )(_)_ __ __ _ _ __ _ _ / \ _ __ __ _| |_ _ ___(_)___ | | | |_ _ _ __ | |_(_)_ __ __ _
| _ \| | '_ \ / _` | '__| | | | / _ \ | '_ \ / _` | | | | / __| / __| | |_| | | | | '_ \| __| | '_ \ / _` |
| |_) | | | | | (_| | | | |_| | / ___ \| | | | (_| | | |_| \__ \ \__ \ | _ | |_| | | | | |_| | | | | (_| |
|____/|_|_| |_|\__,_|_| \__, | /_/ \_\_| |_|\__,_|_|\__, |___/_|___/ |_| |_|\__,_|_| |_|\__|_|_| |_|\__, |
|___/ |___/ |___/
"""
def credential_file():
credentials_response_locations = [
"/etc/carbonblack/credentials.response",
f"{os.path.dirname(__file__)}/.carbonblack/credentials.response",
f"{os.getcwd()}/.carbonblack/credentials.response"
]
for p in credentials_response_locations:
location = ""
if exists(path=p):
location = p
else:
pass
return location
def config_reader(file_location):
config = configparser.ConfigParser()
config.sections()
config.read(file_location)
return config
def get_binaries(base_url, auth_token, query="*", rows="100000"):
binary_url = f"{base_url}/api/v1/binary?q={query}&rows={rows}"
headers = {
"X-Auth-Token":f"{auth_token}"
}
binaries = requests.get(binary_url, headers=headers)
if binaries.status_code == 200:
return binaries
else:
sys.exit(err_Col + f"[!][!] We encountered a problem with the query to Carbon Black. The status code returned was {binaries.status_code}")
return None
def parse_results(results, base_url, subdomain):
results_dict = []
for binary in results.json()["results"]:
try:
deets = dict()
dig_sig = ''
if "digsig_publisher" in binary:
dig_sig = binary['digsig_publisher']
observed_filename_list = ''
for observed in binary['observed_filename']:
observed_filename_list += observed.replace("\\\\", "\\") + "\n"
deets = {
"shortname":subdomain,
"md5":binary["md5"],
"signature_status":binary["signed"],
"company_name":binary["company_name"],
"observed_filename":observed_filename_list,
"count_observed_filename":int(len(binary["observed_filename"])),
"original_filename":binary["original_filename"],
"internal_name":binary["internal_name"],
"file_desc":binary["file_desc"],
"server_added_timestamp":binary["server_added_timestamp"],
"digsig_publisher":dig_sig,
"os_type":binary["os_type"],
"host_count":binary["host_count"],
"is_executable_image":binary["is_executable_image"],
"url":f'{base_url}/#/binary/{binary["md5"]}'
}
results_dict.append(deets)
except Exception as e:
sys.exit(err_Col + f"[!] We encountered an issue trying to parse out your results. This is the exception {e}." + no_col)
return results_dict
def write_csv(output, results):
header = ['shortname', 'md5', 'signature_status', 'company_name', 'observed_filename', 'count_observed_filename', 'original_filename', 'internal_name', 'file_desc', 'server_added_timestamp', 'digsig_publisher', 'os_type', 'host_count', 'is_executable_image', 'url']
with open(output, 'w', encoding='UTF8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=header)
writer.writeheader()
writer.writerows(results)
def main():
parser = argparse.ArgumentParser(description="Script to hunt through binaries of a single Carbon Black Response customer.")
parser.add_argument("--subdomain", "--d", type=str, required=False, help="Subdomain you would like to hunt through")
parser.add_argument("--config", "--c", type=str, required=False, help="location of .ini file with the credentials required.")
parser.add_argument("--output", "--o", required=False, help="Where do you want to store the CSV file of results?")
parser.add_argument("--query", "--q", required=False, help="Specify a specific query you'd like to search. By default it'll return everything.")
parser.add_argument("--rows", "--r", required=False, help="Specify the number of rows you'd like outputted. By default it'll return up to 100,000.")
parser.add_argument("--no-color", action='store_true')
parser.add_argument("--api", required=False, help="API Key for your Carbon Black Instance")
parser.add_argument("--url", required=False, help="URL for your Carbon Black Response server")
args = parser.parse_args()
if args.no_color == True:
err_Col = no_col
success_Col = no_col
warn_Col = no_col
else:
err_Col = '\033[91m'
success_Col = '\033[92m'
warn_Col = '\033[93m'
#display ascii art
print(err_Col + birb)
print(success_Col + binary_hunt_ascii + no_col)
cred_file = ""
api = ""
url = ""
if args.config:
cred_file = args.config
elif credential_file() != "":
cred_file = credential_file()
else:
print(warn_Col + "[!] We couldn't find a credential .ini file in any standard location.")
if args.api:
api = args.api
else:
api = input(warn_Col + "Please enter a valid API key: " + no_col)
if args.url:
url = args.url
else:
url = input(warn_Col + "Please enter a valid URL for your CB server (no trailing /): " + no_col)
#checks to see if a subdomain was provided
if args.subdomain and cred_file:
subdomain = args.subdomain
elif cred_file:
subdomain = input(warn_Col + "[!] No subdomain provided. Please enter a subdomain that is included in your credential file: " + no_col)
else:
subdomain = ""
#check if output location was given
if args.output:
output = args.output
else:
output = os.getcwd() + "/binary_analysis.csv"
print(warn_Col + "[*] No output file provided. The output file will be in the current working directory and named binary_analysis.csv" + no_col)
if api and url:
base_url = url.rstrip()
auth_token = api.rstrip()
else:
#check the config file to make sure it has the appropriate subdomain and format
config = config_reader(cred_file)
try:
base_url = config[subdomain]['url']
auth_token = config[subdomain]['token']
except Exception as e:
sys.exit(err_Col + "[!][!] The subodmain you provided is not in the credentials.response file. Please add it.")
if args.query and args.rows:
binaries = get_binaries(base_url, auth_token, query=args.query, rows=args.rows)
elif args.query:
binaries = get_binaries(base_url, auth_token, query=args.query)
elif args.rows:
binaries = get_binaries(base_url, auth_token, rows=args.rows)
else:
print(success_Col + "[*] Getting list of binaries from Carbon Black")
binaries = get_binaries(base_url, auth_token)
print(success_Col + f"[*] There were a total of {int(len(binaries.json()['results']))} results returned")
print(success_Col + f"[*] Parsing the results to {output}")
if binaries:
parsed_results = parse_results(binaries, base_url, subdomain)
write_csv(output, parsed_results)
else:
sys.exit(err_Col + "We've encountered a problem.")
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment