Created
August 27, 2021 08:50
-
-
Save klingerko/50e62c3eb0dbee7b7e3d6a6f2e138043 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import sys | |
import time | |
# create your api token with: curl -d "username=<USER>&password=<PASSWD>" https://capesandbox.com/apiv2/api-token-auth/ | |
headers = {"Authorization": "Token <INSERT_TOKEN>"} | |
# quick check for status api endpoint to see if api token works and we can reach the api | |
response = requests.get("https://www.capesandbox.com/apiv2/cuckoo/status/", headers=headers) | |
time.sleep(2) # sleep 2 seconds, we don't want to break anything | |
if response and response.status_code == 200: | |
response_dict = json.loads(response.content) | |
if response_dict.get("error", True): | |
print("[ERROR] api endpoint could be reached") | |
sys.exit() | |
# Let's start with an easy malfamily (Remcos), we can add more in the future | |
# List of config extractors: https://github.com/kevoreilly/CAPEv2/tree/master/modules/processing/parsers | |
remcos_task_list = [] | |
data = { | |
"option": "detections", | |
"argument": "Remcos", | |
} | |
response = requests.post("https://www.capesandbox.com/apiv2/tasks/extendedsearch/", headers=headers, data=data) | |
time.sleep(2) # sleep 2 seconds, we don't want to break anything | |
if response and response.status_code == 200: | |
response_dict = json.loads(response.content) | |
task_list = response_dict.get("data", []) | |
for task in task_list: | |
if task.get("detections", "") == "Remcos" and task.get("info", {}).get("id", ""): | |
remcos_task_list.append(task.get("info", {}).get("id", "")) | |
# go through Remcos task list and get config | |
remcos_config_list = [] | |
for entry in remcos_task_list: | |
url = "https://www.capesandbox.com/apiv2/tasks/get/config/" + str(entry) + "/" | |
response = requests.get(url, headers=headers) | |
time.sleep(2) # sleep 2 seconds, we don't want to break anything | |
if response and response.status_code == 200: | |
response_dict = json.loads(response.content) | |
if response_dict.get("error", True): | |
# config not available | |
# print("[ERROR] " + response_dict.get("error_value", "unknown error")) | |
continue | |
else: | |
# config is available | |
for config in response_dict.get("configs", []): | |
remcos_config = config.get("Remcos", {}) | |
# not all configs look the same (of course) | |
remcos_config = dict((k.lower(), v) for k,v in remcos_config.items()) | |
if remcos_config: | |
version = remcos_config.get("version", "") | |
control = remcos_config.get("control", []) | |
domains = remcos_config.get("domains", []) | |
if not version: | |
print("[ERROR] Config found without version") | |
continue | |
else: | |
if control: | |
for c in control: | |
if c and c.startswith("tcp://"): | |
tmp = c.replace("tcp://", "").split(":") | |
if tmp and len(tmp) == 2: | |
remcos_config_list.append( | |
{ | |
"Version": version[0], | |
"C2": tmp[0], | |
"Port": tmp[1], | |
} | |
) | |
else: | |
print("[ERROR] Unknown control pattern") | |
if domains: | |
for d1 in domains: | |
for d2 in d1: | |
c2 = d2.get("c2:", "") | |
port = d2.get("port", "") | |
if c2 and port: | |
remcos_config_list.append( | |
{ | |
"Version": version[0], | |
"C2": c2, | |
"Port": port, | |
} | |
) | |
# Now we want to create Suricata rules finally | |
for object in remcos_config_list: | |
version = object.get("Version") | |
c2 = object.get("C2") | |
port = object.get("Port") | |
rule = f"alert tcp $HOME_NET any -> $EXTERNAL_NET {port} (msg:\"ET MALWARE Remcos RAT (Version {version}) " \ | |
f"C2 Communication - CAPE sandbox config extraction\"; flow:established,to_server; " \ | |
f"content:\"{c2}\"; fast_pattern; sid:0; rev:1;)" | |
print(rule) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment