Skip to content

Instantly share code, notes, and snippets.

@moohax
Last active August 13, 2021 23:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moohax/0ff0593691d3fab4877a14aae893938b to your computer and use it in GitHub Desktop.
Save moohax/0ff0593691d3fab4877a14aae893938b to your computer and use it in GitHub Desktop.
# Generated by counterfit #
# code modified from https://github.com/monoxgas/FlyingAFalseFlag/blob/256197b78a8140d15df6e18b3221b637b5c3490a/Addendum/addendum.py
import os
import re
import json
import time
import requests
import numpy as np
import tqdm
import uuid
from zipfile import ZipFile
from counterfit.core import config
from counterfit.core.targets import ArtTarget
# Don't forget to add your headers and login information #
# Sometimes it will not require the X-Recaptcha-Response header. If it is not available in the headers, comment it out.
class Virustotal(ArtTarget):
model_name = "virustotal"
model_data_type = "pe"
model_endpoint = "https://virustotal.com"
model_input_shape = (1,)
model_output_classes = [0, 1]
X = []
def __init__(self):
# Data Information
self.zip_info = []
self.encryption_password = b'infected'
self.sample_endpoint = 'https://mlsec.io/static/MLSEC_2021_malware.zip'
self.sample_input_path = f"{config.targets_path}/{self.model_name}/mlsec_malware_samples.zip"
# Download the samples
if not os.path.exists(self.sample_input_path):
self.download_samples()
print(
f"\n[-] scanning malware sample info from {self.sample_input_path}\n")
with ZipFile(self.sample_input_path) as thezip:
thezip.setpassword(self.encryption_password)
for zipinfo in tqdm.tqdm(thezip.infolist()):
self.zip_info.append(zipinfo)
# Load samples into X
self.X = self.read_input_data()
# VT Information
self.session = requests.session()
self.scantime = 90
self.headers = {
"Accept": "application/json",
"Accept-Ianguage": "en-US,en;q=0.9,es;q=0.8",
"X-Tool": "vt-ui-main",
"X-App-Version": "v1x37x1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36 Edg/92.0.902.67",
"X-Recaptcha-Response": "",
"X-Vt-Anti-Abuse-Header": "",
}
if not self.login('', ''):
print('[+] Login failed!')
return
self.headers["X-Session-Hash"] = self.session.cookies['VT_SESSION_HASH']
def login(self, username, password):
login_json = {
"data": {
"forever": True,
"password": password,
"user_id": username
}
}
login_response = self.session.post(
f"{self.model_endpoint}/ui/signin", headers=self.headers, json=login_json)
print(login_response)
return (login_response.status_code == 200)
def download_samples(self):
response = requests.get(self.sample_endpoint)
if response.ok:
print(
f"\n[+] Successfully downloaded malware samples to {self.sample_input_path}\n")
with open(self.sample_input_path, "wb") as samples:
samples.write(response.content)
else:
print(
f"\n[+] Failed to download malware samples: {response.status_code}\n")
def read_input_data(self):
out = []
with ZipFile(self.sample_input_path) as thezip:
for i in range(len(self.zip_info)):
with thezip.open(self.zip_info[i], pwd=self.encryption_password) as thefile:
out.append(thefile.read())
return out
def upload_file(self, file_name, file_data):
upload_url_response = requests.get(
f"{self.model_endpoint}/ui/files/upload_url", headers=self.headers)
try:
upload_url = upload_url_response.json()['data']
except:
print('[!] Failed to get upload URL')
return False
files = {
'file': (file_name, file_data),
'filename': (None, file_name)
}
upload_response = requests.post(
upload_url, files=files, headers=self.headers)
try:
file_id = upload_response.json()['data']['id']
print(f'[+] File id is: {file_id}')
except:
print('[!] Failed to post file')
return False
return file_id
def get_file_analysis(self, file_id):
response = requests.get(
f"{self.model_endpoint}/ui/analyses/{file_id}", headers=self.headers)
try:
status = response.json()['data']['attributes']['status']
stats = response.json()['data']['attributes']['stats']
results = response.json()['data']['attributes']['results']
except:
status, stats, results = None, None, None
return status, stats, results
def set_attack_samples(self, index=0):
# JIT loading of samples
if hasattr(index, "__iter__"):
# list of indices
to_fetch = index
else:
to_fetch = [index]
out = []
for i in to_fetch:
out.append(self.X[i])
self.active_attack.sample_index = index
self.active_attack.samples = out
def __call__(self, x):
scores = []
for sample in x:
fname = uuid.uuid4().hex
sample_id = self.upload_file(fname, x[0])
time_left = self.scantime
while time_left:
status, stats, results = self.get_file_analysis(sample_id)
if 'completed' not in status or not results:
time.sleep(15)
time_left -= 15
else:
print(stats)
break
if not time_left:
print('[!] Timeout hit waiting for analysis to finish')
scores.append([0.0, 1.0])
continue
else:
try:
score = float(re.findall(
r'(\d+)', results['MAX']['result'])[0])/100
print(score)
scores.append([1-score, score])
except:
print('[!] Failed to get score!')
scores.append([0.0, 1.0])
return np.array(scores)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment