Skip to content

Instantly share code, notes, and snippets.

@klingerko
Created August 27, 2021 08:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save klingerko/50e62c3eb0dbee7b7e3d6a6f2e138043 to your computer and use it in GitHub Desktop.
Save klingerko/50e62c3eb0dbee7b7e3d6a6f2e138043 to your computer and use it in GitHub Desktop.
import requests
import json
import sys
import time
# create your api token with: curl -d "username=<USER>&password=<PASSWD>" https://capesandbox.com/apiv2/api-token-auth/
headers = {"Authorization": "Token <INSERT_TOKEN>"}
# quick check for status api endpoint to see if api token works and we can reach the api
response = requests.get("https://www.capesandbox.com/apiv2/cuckoo/status/", headers=headers)
time.sleep(2) # sleep 2 seconds, we don't want to break anything
if response and response.status_code == 200:
response_dict = json.loads(response.content)
if response_dict.get("error", True):
print("[ERROR] api endpoint could be reached")
sys.exit()
# Let's start with an easy malfamily (Remcos), we can add more in the future
# List of config extractors: https://github.com/kevoreilly/CAPEv2/tree/master/modules/processing/parsers
remcos_task_list = []
data = {
"option": "detections",
"argument": "Remcos",
}
response = requests.post("https://www.capesandbox.com/apiv2/tasks/extendedsearch/", headers=headers, data=data)
time.sleep(2) # sleep 2 seconds, we don't want to break anything
if response and response.status_code == 200:
response_dict = json.loads(response.content)
task_list = response_dict.get("data", [])
for task in task_list:
if task.get("detections", "") == "Remcos" and task.get("info", {}).get("id", ""):
remcos_task_list.append(task.get("info", {}).get("id", ""))
# go through Remcos task list and get config
remcos_config_list = []
for entry in remcos_task_list:
url = "https://www.capesandbox.com/apiv2/tasks/get/config/" + str(entry) + "/"
response = requests.get(url, headers=headers)
time.sleep(2) # sleep 2 seconds, we don't want to break anything
if response and response.status_code == 200:
response_dict = json.loads(response.content)
if response_dict.get("error", True):
# config not available
# print("[ERROR] " + response_dict.get("error_value", "unknown error"))
continue
else:
# config is available
for config in response_dict.get("configs", []):
remcos_config = config.get("Remcos", {})
# not all configs look the same (of course)
remcos_config = dict((k.lower(), v) for k,v in remcos_config.items())
if remcos_config:
version = remcos_config.get("version", "")
control = remcos_config.get("control", [])
domains = remcos_config.get("domains", [])
if not version:
print("[ERROR] Config found without version")
continue
else:
if control:
for c in control:
if c and c.startswith("tcp://"):
tmp = c.replace("tcp://", "").split(":")
if tmp and len(tmp) == 2:
remcos_config_list.append(
{
"Version": version[0],
"C2": tmp[0],
"Port": tmp[1],
}
)
else:
print("[ERROR] Unknown control pattern")
if domains:
for d1 in domains:
for d2 in d1:
c2 = d2.get("c2:", "")
port = d2.get("port", "")
if c2 and port:
remcos_config_list.append(
{
"Version": version[0],
"C2": c2,
"Port": port,
}
)
# Now we want to create Suricata rules finally
for object in remcos_config_list:
version = object.get("Version")
c2 = object.get("C2")
port = object.get("Port")
rule = f"alert tcp $HOME_NET any -> $EXTERNAL_NET {port} (msg:\"ET MALWARE Remcos RAT (Version {version}) " \
f"C2 Communication - CAPE sandbox config extraction\"; flow:established,to_server; " \
f"content:\"{c2}\"; fast_pattern; sid:0; rev:1;)"
print(rule)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment