MLSEC 2020: Need for Speed Malware Detection Model
import json | |
class JSONAttributeExtractor(): | |
# initialize extractor | |
def __init__(self, file): | |
# save data | |
self.data = json.loads(file) | |
# attributes | |
self.attributes = {} | |
# extract string metadata | |
def extract_string_metadata(self): | |
return { | |
'string_paths': self.data["strings"]["paths"], | |
'string_urls': self.data["strings"]["urls"], | |
'string_registry': self.data["strings"]["registry"], | |
'string_MZ': self.data["strings"]["MZ"] | |
} | |
# extract attributes | |
def extract(self): | |
# get general info | |
self.attributes.update({ | |
"size": self.data["general"]["size"], | |
"virtual_size": self.data["general"]["vsize"], | |
"has_debug": self.data["general"]["has_debug"], | |
"imports": self.data["general"]["imports"], | |
"exports": self.data["general"]["exports"], | |
"has_relocations": self.data["general"]["has_relocations"], | |
"has_resources": self.data["general"]["has_resources"], | |
"has_signature": self.data["general"]["has_signature"], | |
"has_tls": self.data["general"]["has_tls"], | |
"symbols": self.data["general"]["symbols"], | |
}) | |
# get header info | |
self.attributes.update({ | |
"timestamp": self.data["header"]["coff"]["timestamp"], | |
"machine": self.data["header"]["coff"]["machine"], | |
"numberof_sections": len(self.data["section"]["sections"]), | |
"characteristics_list": " ".join(self.data["header"]["coff"]["characteristics"]) | |
}) | |
# get optional header | |
self.attributes.update({ | |
"dll_characteristics_list": " ".join(self.data["header"]["optional"]["dll_characteristics"]), | |
"magic": self.data["header"]["optional"]["magic"], | |
"major_image_version": self.data["header"]["optional"]["major_image_version"], | |
"minor_image_version": self.data["header"]["optional"]["minor_image_version"], | |
"major_linker_version": self.data["header"]["optional"]["major_linker_version"], | |
"minor_linker_version": self.data["header"]["optional"]["minor_linker_version"], | |
"major_operating_system_version": self.data["header"]["optional"]["major_operating_system_version"], | |
"minor_operating_system_version": self.data["header"]["optional"]["minor_operating_system_version"], | |
"major_subsystem_version": self.data["header"]["optional"]["major_subsystem_version"], | |
"minor_subsystem_version": self.data["header"]["optional"]["minor_subsystem_version"], | |
"sizeof_code": self.data["header"]["optional"]["sizeof_code"], | |
"sizeof_headers": self.data["header"]["optional"]["sizeof_headers"], | |
"sizeof_heap_commit": self.data["header"]["optional"]["sizeof_heap_commit"] | |
}) | |
# get string metadata | |
self.attributes.update(self.extract_string_metadata()) | |
# get imported libraries and functions | |
self.libraries = " ".join([item for sublist in self.data["imports"].values() for item in sublist]) | |
self.functions = " ".join(self.data["imports"].keys()) | |
self.attributes.update({"functions": self.functions, "libraries": self.libraries}) | |
# get exports | |
self.exports = " ".join(self.data["exports"]) | |
self.attributes.update({"exports_list": self.exports}) | |
# get label | |
self.label = self.data["label"] | |
self.attributes.update({"label": self.label}) | |
return(self.attributes) |
from copy import deepcopy | |
from sklearn.preprocessing import OneHotEncoder | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.preprocessing import MinMaxScaler | |
from sklearn.ensemble import RandomForestClassifier | |
# need for speed class | |
class NeedForSpeedModel(): | |
# numerical attributes | |
NUMERICAL_ATTRIBUTES = [ | |
'string_paths', 'string_urls', 'string_registry', 'string_MZ', 'size', | |
'virtual_size', 'has_debug', 'imports', 'exports', 'has_relocations', | |
'has_resources', 'has_signature', 'has_tls', 'symbols', 'timestamp', | |
'numberof_sections', 'major_image_version', 'minor_image_version', | |
'major_linker_version', 'minor_linker_version', 'major_operating_system_version', | |
'minor_operating_system_version', 'major_subsystem_version', | |
'minor_subsystem_version', 'sizeof_code', 'sizeof_headers', 'sizeof_heap_commit' | |
] | |
# categorical attributes | |
CATEGORICAL_ATTRIBUTES = [ | |
'machine', 'magic' | |
] | |
# textual attributes | |
TEXTUAL_ATTRIBUTES = ['libraries', 'functions', 'exports_list', | |
'dll_characteristics_list', 'characteristics_list'] | |
# label | |
LABEL = "label" | |
# initialize NFS classifier | |
def __init__(self, | |
categorical_extractor = OneHotEncoder(handle_unknown="ignore"), | |
textual_extractor = TfidfVectorizer(max_features=300), | |
feature_scaler = MinMaxScaler(), | |
classifier = RandomForestClassifier(n_estimators=100)): | |
self.base_categorical_extractor = categorical_extractor | |
self.base_textual_extractor = textual_extractor | |
self.base_feature_scaler = feature_scaler | |
self.base_classifier = classifier | |
# append features to original features list | |
def _append_features(self, original_features, appended): | |
if original_features: | |
for l1, l2 in zip(original_features, appended): | |
for i in l2: | |
l1.append(i) | |
return(original_features) | |
else: | |
return appended.tolist() | |
# train a categorical extractor | |
def _train_categorical_extractor(self, categorical_attributes): | |
# initialize categorical extractor | |
self.categorical_extractor = deepcopy(self.base_categorical_extractor) | |
# train categorical extractor | |
self.categorical_extractor.fit(categorical_attributes.values) | |
# transform categorical attributes into features | |
def _transform_categorical_attributes(self, categorical_attributes): | |
# transform categorical attributes using categorical extractor | |
cat_features = self.categorical_extractor.transform(categorical_attributes.values.tolist()).toarray() | |
# return categorical features | |
return cat_features.tolist() | |
# train a textual extractor | |
def _train_textual_extractor(self, textual_attributes): | |
# initialize textual extractors | |
self.textual_extractors = {} | |
# train feature extractor for each textual attribute | |
for att in self.TEXTUAL_ATTRIBUTES: | |
# initialize textual extractors | |
self.textual_extractors[att] = deepcopy(self.base_textual_extractor) | |
# train textual extractor | |
self.textual_extractors[att].fit(textual_attributes[att].values) | |
# transform textual extractor | |
def _transform_textual_attributes(self, textual_attributes): | |
# initialize features | |
textual_features = None | |
# extract features from each textual attribute | |
for att in self.TEXTUAL_ATTRIBUTES: | |
# train textual extractor | |
att_features = self.textual_extractors[att].transform(textual_attributes[att].values) | |
# transform into array (when it is an sparse matrix) | |
att_features = att_features.toarray() | |
# append textual features | |
textual_features = self._append_features(textual_features, att_features) | |
return textual_features | |
# train feature scaler | |
def _train_feature_scaler(self, features): | |
# initialize feature scaler | |
self.feature_scaler = deepcopy(self.base_feature_scaler) | |
# train feature scaler | |
self.feature_scaler.fit(features) | |
# transform features using feature scaler | |
def _transform_feature_scaler(self, features): | |
return self.feature_scaler.transform(features) | |
# train classifier | |
def _train_classifier(self,features,labels): | |
# initialize classifier | |
self.classifier = deepcopy(self.base_classifier) | |
# train feature scaler | |
self.classifier.fit(features, labels) | |
# fit classifier using raw input | |
def fit(self, train_data): | |
# get labels | |
train_labels = train_data[self.LABEL] | |
# delete label column | |
del train_data[self.LABEL] | |
# initialize train_features with numerical ones | |
train_features = train_data[self.NUMERICAL_ATTRIBUTES].values.tolist() | |
print("Training categorical features...") | |
# train categorical extractor | |
self._train_categorical_extractor(train_data[self.CATEGORICAL_ATTRIBUTES]) | |
# transform categorical data | |
cat_train_features = self._transform_categorical_attributes(train_data[self.CATEGORICAL_ATTRIBUTES]) | |
# append categorical_features to train_features | |
train_features = self._append_features(train_features, cat_train_features) | |
print("Training textual features...") | |
# train textual extractor | |
self._train_textual_extractor(train_data[self.TEXTUAL_ATTRIBUTES]) | |
# transform textual data | |
tex_train_features = self._transform_textual_attributes(train_data[self.TEXTUAL_ATTRIBUTES]) | |
# append textual_features to train_features | |
train_features = self._append_features(train_features, tex_train_features) | |
print("Normalizing features...") | |
# train feature normalizer | |
self._train_feature_scaler(train_features) | |
# transform features | |
train_features = self._transform_feature_scaler(train_features) | |
print("Training classifier...") | |
# train classifier | |
return self._train_classifier(train_features, train_labels) | |
def _extract_features(self,data): | |
# initialize features with numerical ones | |
features = data[self.NUMERICAL_ATTRIBUTES].values.tolist() | |
print("Getting categorical features...") | |
# transform categorical data | |
cat_features = self._transform_categorical_attributes(data[self.CATEGORICAL_ATTRIBUTES]) | |
# append categorical_features to features | |
features = self._append_features(features, cat_features) | |
print("Getting textual features...") | |
# transform textual data | |
tex_features = self._transform_textual_attributes(data[self.TEXTUAL_ATTRIBUTES]) | |
# append textual_features to features | |
features = self._append_features(features, tex_features) | |
print("Normalizing features...") | |
# transform features | |
features = self._transform_feature_scaler(features) | |
# return features | |
return(features) | |
def predict(self,test_data): | |
# extract features | |
test_features = self._extract_features(test_data) | |
print("Predicting classes...") | |
# predict features | |
return self.classifier.predict(test_features) | |
def predict_proba(self,test_data): | |
# extract features | |
test_features = self._extract_features(test_data) | |
print("Predicting classes...") | |
# predict features | |
return self.classifier.predict_proba(test_features) |
import pickle | |
import lief | |
import pandas as pd | |
from pe_attribute_extractor import PEAttributeExtractor | |
class NFSWrapper(): | |
def __init__(self, model, threshold = 0.8): | |
# load model | |
self.clf = pickle.load(model) | |
# set threshold | |
self.threshold = threshold | |
def predict(self, bytez: bytes) -> int: | |
try: | |
# initialize attribute extractor | |
pe_att_ext = PEAttributeExtractor(bytez) | |
# extract attributes | |
atts = pe_att_ext.extract() | |
# create dataframe | |
atts = pd.DataFrame([atts]) | |
# predict sample probability | |
prob = self.clf.predict_proba(atts)[0] | |
# get prediction according to gw probability | |
pred = int(prob[0] < self.threshold) | |
# calc probability | |
if pred: | |
# calc normalized mw probality | |
prob[pred] = 0.5 + ((self.threshold-prob[0])/self.threshold)*0.5 | |
else: | |
# calc normalized gw probality | |
prob[pred] = 0.5 + ((prob[0]-self.threshold)/(1-self.threshold))*0.5 | |
except (lief.bad_format, lief.read_out_of_bound) as e: | |
# error parsing PE file, we considere | |
# it's a malware | |
print("Error: ", e) | |
pred = 1 | |
prob = [0, 1] | |
# return prediction and probability | |
return(int(pred), prob[pred]) |
import re | |
import math | |
import lief | |
class PEAttributeExtractor(): | |
libraries = "" | |
functions = "" | |
exports = "" | |
# initialize extractor | |
def __init__(self, bytez): | |
# save bytes | |
self.bytez = bytez | |
# parse using lief | |
self.lief_binary = lief.PE.parse(list(bytez)) | |
# attributes | |
self.attributes = {} | |
# extract string metadata | |
def extract_string_metadata(self): | |
# occurances of string 'C:\' | |
paths = re.compile(b'c:\\\\', re.IGNORECASE) | |
# occurances of http:// or https:// | |
urls = re.compile(b'https?://', re.IGNORECASE) | |
# occurances of string prefix HKEY_ | |
registry = re.compile(b'HKEY_') | |
# evidences of MZ header | |
mz = re.compile(b'MZ') | |
return { | |
'string_paths': len(paths.findall(self.bytez)), | |
'string_urls': len(urls.findall(self.bytez)), | |
'string_registry': len(registry.findall(self.bytez)), | |
'string_MZ': len(mz.findall(self.bytez)) | |
} | |
# extract entropy | |
def extract_entropy(self): | |
if not self.bytez: | |
return 0 | |
entropy=0 | |
for x in range(256): | |
p_x = float(self.bytez.count(bytes(x)))/len(self.bytez) | |
if p_x>0: | |
entropy += - p_x*math.log(p_x, 2) | |
return entropy | |
# extract attributes | |
def extract(self): | |
# get general info | |
self.attributes.update({ | |
"size": len(self.bytez), | |
"virtual_size": self.lief_binary.virtual_size, | |
"has_debug": int(self.lief_binary.has_debug), | |
"imports": len(self.lief_binary.imports), | |
"exports": len(self.lief_binary.exported_functions), | |
"has_relocations": int(self.lief_binary.has_relocations), | |
"has_resources": int(self.lief_binary.has_resources), | |
"has_signature": int(self.lief_binary.has_signature), | |
"has_tls": int(self.lief_binary.has_tls), | |
"symbols": len(self.lief_binary.symbols), | |
}) | |
# get header info | |
self.attributes.update({ | |
"timestamp": self.lief_binary.header.time_date_stamps, | |
"machine": str(self.lief_binary.header.machine), | |
"numberof_sections": self.lief_binary.header.numberof_sections, | |
"numberof_symbols": self.lief_binary.header.numberof_symbols, | |
"pointerto_symbol_table": self.lief_binary.header.pointerto_symbol_table, | |
"sizeof_optional_header": self.lief_binary.header.sizeof_optional_header, | |
"characteristics": int(self.lief_binary.header.characteristics), | |
"characteristics_list": " ".join([str(c).replace("HEADER_CHARACTERISTICS.","") for c in self.lief_binary.header.characteristics_list]) | |
}) | |
try: | |
baseof_data = self.lief_binary.optional_header.baseof_data | |
except: | |
baseof_data = 0 | |
# get optional header | |
self.attributes.update({ | |
"baseof_code": self.lief_binary.optional_header.baseof_code, | |
"baseof_data": baseof_data, | |
"dll_characteristics": self.lief_binary.optional_header.dll_characteristics, | |
"dll_characteristics_list": " ".join([str(d).replace("DLL_CHARACTERISTICS.", "") for d in self.lief_binary.optional_header.dll_characteristics_lists]), | |
"file_alignment": self.lief_binary.optional_header.file_alignment, | |
"imagebase": self.lief_binary.optional_header.imagebase, | |
"magic": str(self.lief_binary.optional_header.magic).replace("PE_TYPE.",""), | |
"PE_TYPE": int(self.lief_binary.optional_header.magic), | |
"major_image_version": self.lief_binary.optional_header.major_image_version, | |
"minor_image_version": self.lief_binary.optional_header.minor_image_version, | |
"major_linker_version": self.lief_binary.optional_header.major_linker_version, | |
"minor_linker_version": self.lief_binary.optional_header.minor_linker_version, | |
"major_operating_system_version": self.lief_binary.optional_header.major_operating_system_version, | |
"minor_operating_system_version": self.lief_binary.optional_header.minor_operating_system_version, | |
"major_subsystem_version": self.lief_binary.optional_header.major_subsystem_version, | |
"minor_subsystem_version": self.lief_binary.optional_header.minor_subsystem_version, | |
"numberof_rva_and_size": self.lief_binary.optional_header.numberof_rva_and_size, | |
"sizeof_code": self.lief_binary.optional_header.sizeof_code, | |
"sizeof_headers": self.lief_binary.optional_header.sizeof_headers, | |
"sizeof_heap_commit": self.lief_binary.optional_header.sizeof_heap_commit, | |
"sizeof_image": self.lief_binary.optional_header.sizeof_image, | |
"sizeof_initialized_data": self.lief_binary.optional_header.sizeof_initialized_data, | |
"sizeof_uninitialized_data": self.lief_binary.optional_header.sizeof_uninitialized_data, | |
"subsystem": str(self.lief_binary.optional_header.subsystem).replace("SUBSYSTEM.","") | |
}) | |
# get entropy | |
self.attributes.update({ | |
"entropy": self.extract_entropy() | |
}) | |
# get string metadata | |
self.attributes.update(self.extract_string_metadata()) | |
# get imported libraries and functions | |
if self.lief_binary.has_imports: | |
self.libraries = " ".join([l for l in self.lief_binary.libraries]) | |
self.functions = " ".join([f.name for f in self.lief_binary.imported_functions]) | |
self.attributes.update({"functions": self.functions, "libraries": self.libraries}) | |
# get exports | |
if self.lief_binary.has_exports: | |
self.exports = " ".join([f.name for f in self.lief_binary.exported_functions]) | |
self.attributes.update({"exports_list": self.exports}) | |
return(self.attributes) |
import sys | |
from nfs_wrapper import NFSWrapper | |
# initialize classifier with | |
# pre-trained model | |
clf = NFSWrapper(open("nfs.pickle", "rb")) | |
# open test file | |
test_file = open(sys.argv[1],'rb') | |
# get its bytes | |
bytez = test_file.read() | |
# predict pe file | |
pred, prob = clf.predict(bytez) | |
# print probabilities | |
print("Prediction: ", pred) | |
print("Probability: ", prob) |
import json | |
import pickle | |
import pandas as pd | |
from json_attribute_extractor import JSONAttributeExtractor | |
from need_for_speed_model import NeedForSpeedModel | |
# list of files used to train the model in | |
# the same format as ember 17' and 18' | |
# datasets | |
files = [ | |
"ember_2017_2/train_features_0.jsonl", | |
"ember_2017_2/train_features_1.jsonl", | |
"ember_2017_2/train_features_2.jsonl", | |
"ember_2017_2/train_features_3.jsonl", | |
"ember_2017_2/train_features_4.jsonl", | |
"ember_2017_2/train_features_5.jsonl", | |
"ember_2017_2/test_features.jsonl", | |
"ember2018/train_features_0.jsonl", | |
"ember2018/train_features_1.jsonl", | |
"ember2018/train_features_2.jsonl", | |
"ember2018/train_features_3.jsonl", | |
"ember2018/train_features_4.jsonl", | |
"ember2018/train_features_5.jsonl", | |
"ember2018/test_features.jsonl" | |
] | |
if __name__=='__main__': | |
train_attributes = [] | |
# walk in files | |
for input in files: | |
# read input file | |
file = open(input, 'r') | |
# read its lines | |
sws = file.readlines() | |
# walk in each sw | |
for sw in sws: | |
# initialize extractor | |
at_extractor = JSONAttributeExtractor(sw) | |
# get train_attributes | |
atts = at_extractor.extract() | |
# save attribute | |
train_attributes.append(atts) | |
# close file | |
file.close() | |
# create pandas dataframe with train attributes | |
train_data = pd.DataFrame(train_attributes) | |
# get train data that have label | |
train_data = train_data[(train_data["label"]==1) | (train_data["label"]==0)] | |
# initialize nfs model | |
clf = NeedForSpeedModel() | |
# train model | |
clf.fit(train_data) | |
# save model | |
with open('nfs.pickle', 'wb') as f: | |
pickle.dump(clf, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment