Last active
September 24, 2020 05:30
Star
You must be signed in to star a gist
MLSEC 2020: Need for Speed Malware Detection Model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
class JSONAttributeExtractor(): | |
# initialize extractor | |
def __init__(self, file): | |
# save data | |
self.data = json.loads(file) | |
# attributes | |
self.attributes = {} | |
# extract string metadata | |
def extract_string_metadata(self): | |
return { | |
'string_paths': self.data["strings"]["paths"], | |
'string_urls': self.data["strings"]["urls"], | |
'string_registry': self.data["strings"]["registry"], | |
'string_MZ': self.data["strings"]["MZ"] | |
} | |
# extract attributes | |
def extract(self): | |
# get general info | |
self.attributes.update({ | |
"size": self.data["general"]["size"], | |
"virtual_size": self.data["general"]["vsize"], | |
"has_debug": self.data["general"]["has_debug"], | |
"imports": self.data["general"]["imports"], | |
"exports": self.data["general"]["exports"], | |
"has_relocations": self.data["general"]["has_relocations"], | |
"has_resources": self.data["general"]["has_resources"], | |
"has_signature": self.data["general"]["has_signature"], | |
"has_tls": self.data["general"]["has_tls"], | |
"symbols": self.data["general"]["symbols"], | |
}) | |
# get header info | |
self.attributes.update({ | |
"timestamp": self.data["header"]["coff"]["timestamp"], | |
"machine": self.data["header"]["coff"]["machine"], | |
"numberof_sections": len(self.data["section"]["sections"]), | |
"characteristics_list": " ".join(self.data["header"]["coff"]["characteristics"]) | |
}) | |
# get optional header | |
self.attributes.update({ | |
"dll_characteristics_list": " ".join(self.data["header"]["optional"]["dll_characteristics"]), | |
"magic": self.data["header"]["optional"]["magic"], | |
"major_image_version": self.data["header"]["optional"]["major_image_version"], | |
"minor_image_version": self.data["header"]["optional"]["minor_image_version"], | |
"major_linker_version": self.data["header"]["optional"]["major_linker_version"], | |
"minor_linker_version": self.data["header"]["optional"]["minor_linker_version"], | |
"major_operating_system_version": self.data["header"]["optional"]["major_operating_system_version"], | |
"minor_operating_system_version": self.data["header"]["optional"]["minor_operating_system_version"], | |
"major_subsystem_version": self.data["header"]["optional"]["major_subsystem_version"], | |
"minor_subsystem_version": self.data["header"]["optional"]["minor_subsystem_version"], | |
"sizeof_code": self.data["header"]["optional"]["sizeof_code"], | |
"sizeof_headers": self.data["header"]["optional"]["sizeof_headers"], | |
"sizeof_heap_commit": self.data["header"]["optional"]["sizeof_heap_commit"] | |
}) | |
# get string metadata | |
self.attributes.update(self.extract_string_metadata()) | |
# get imported libraries and functions | |
self.libraries = " ".join([item for sublist in self.data["imports"].values() for item in sublist]) | |
self.functions = " ".join(self.data["imports"].keys()) | |
self.attributes.update({"functions": self.functions, "libraries": self.libraries}) | |
# get exports | |
self.exports = " ".join(self.data["exports"]) | |
self.attributes.update({"exports_list": self.exports}) | |
# get label | |
self.label = self.data["label"] | |
self.attributes.update({"label": self.label}) | |
return(self.attributes) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from copy import deepcopy | |
from sklearn.preprocessing import OneHotEncoder | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.preprocessing import MinMaxScaler | |
from sklearn.ensemble import RandomForestClassifier | |
# need for speed class | |
class NeedForSpeedModel(): | |
# numerical attributes | |
NUMERICAL_ATTRIBUTES = [ | |
'string_paths', 'string_urls', 'string_registry', 'string_MZ', 'size', | |
'virtual_size', 'has_debug', 'imports', 'exports', 'has_relocations', | |
'has_resources', 'has_signature', 'has_tls', 'symbols', 'timestamp', | |
'numberof_sections', 'major_image_version', 'minor_image_version', | |
'major_linker_version', 'minor_linker_version', 'major_operating_system_version', | |
'minor_operating_system_version', 'major_subsystem_version', | |
'minor_subsystem_version', 'sizeof_code', 'sizeof_headers', 'sizeof_heap_commit' | |
] | |
# categorical attributes | |
CATEGORICAL_ATTRIBUTES = [ | |
'machine', 'magic' | |
] | |
# textual attributes | |
TEXTUAL_ATTRIBUTES = ['libraries', 'functions', 'exports_list', | |
'dll_characteristics_list', 'characteristics_list'] | |
# label | |
LABEL = "label" | |
# initialize NFS classifier | |
def __init__(self, | |
categorical_extractor = OneHotEncoder(handle_unknown="ignore"), | |
textual_extractor = TfidfVectorizer(max_features=300), | |
feature_scaler = MinMaxScaler(), | |
classifier = RandomForestClassifier(n_estimators=100)): | |
self.base_categorical_extractor = categorical_extractor | |
self.base_textual_extractor = textual_extractor | |
self.base_feature_scaler = feature_scaler | |
self.base_classifier = classifier | |
# append features to original features list | |
def _append_features(self, original_features, appended): | |
if original_features: | |
for l1, l2 in zip(original_features, appended): | |
for i in l2: | |
l1.append(i) | |
return(original_features) | |
else: | |
return appended.tolist() | |
# train a categorical extractor | |
def _train_categorical_extractor(self, categorical_attributes): | |
# initialize categorical extractor | |
self.categorical_extractor = deepcopy(self.base_categorical_extractor) | |
# train categorical extractor | |
self.categorical_extractor.fit(categorical_attributes.values) | |
# transform categorical attributes into features | |
def _transform_categorical_attributes(self, categorical_attributes): | |
# transform categorical attributes using categorical extractor | |
cat_features = self.categorical_extractor.transform(categorical_attributes.values.tolist()).toarray() | |
# return categorical features | |
return cat_features.tolist() | |
# train a textual extractor | |
def _train_textual_extractor(self, textual_attributes): | |
# initialize textual extractors | |
self.textual_extractors = {} | |
# train feature extractor for each textual attribute | |
for att in self.TEXTUAL_ATTRIBUTES: | |
# initialize textual extractors | |
self.textual_extractors[att] = deepcopy(self.base_textual_extractor) | |
# train textual extractor | |
self.textual_extractors[att].fit(textual_attributes[att].values) | |
# transform textual extractor | |
def _transform_textual_attributes(self, textual_attributes): | |
# initialize features | |
textual_features = None | |
# extract features from each textual attribute | |
for att in self.TEXTUAL_ATTRIBUTES: | |
# train textual extractor | |
att_features = self.textual_extractors[att].transform(textual_attributes[att].values) | |
# transform into array (when it is an sparse matrix) | |
att_features = att_features.toarray() | |
# append textual features | |
textual_features = self._append_features(textual_features, att_features) | |
return textual_features | |
# train feature scaler | |
def _train_feature_scaler(self, features): | |
# initialize feature scaler | |
self.feature_scaler = deepcopy(self.base_feature_scaler) | |
# train feature scaler | |
self.feature_scaler.fit(features) | |
# transform features using feature scaler | |
def _transform_feature_scaler(self, features): | |
return self.feature_scaler.transform(features) | |
# train classifier | |
def _train_classifier(self,features,labels): | |
# initialize classifier | |
self.classifier = deepcopy(self.base_classifier) | |
# train feature scaler | |
self.classifier.fit(features, labels) | |
# fit classifier using raw input | |
def fit(self, train_data): | |
# get labels | |
train_labels = train_data[self.LABEL] | |
# delete label column | |
del train_data[self.LABEL] | |
# initialize train_features with numerical ones | |
train_features = train_data[self.NUMERICAL_ATTRIBUTES].values.tolist() | |
print("Training categorical features...") | |
# train categorical extractor | |
self._train_categorical_extractor(train_data[self.CATEGORICAL_ATTRIBUTES]) | |
# transform categorical data | |
cat_train_features = self._transform_categorical_attributes(train_data[self.CATEGORICAL_ATTRIBUTES]) | |
# append categorical_features to train_features | |
train_features = self._append_features(train_features, cat_train_features) | |
print("Training textual features...") | |
# train textual extractor | |
self._train_textual_extractor(train_data[self.TEXTUAL_ATTRIBUTES]) | |
# transform textual data | |
tex_train_features = self._transform_textual_attributes(train_data[self.TEXTUAL_ATTRIBUTES]) | |
# append textual_features to train_features | |
train_features = self._append_features(train_features, tex_train_features) | |
print("Normalizing features...") | |
# train feature normalizer | |
self._train_feature_scaler(train_features) | |
# transform features | |
train_features = self._transform_feature_scaler(train_features) | |
print("Training classifier...") | |
# train classifier | |
return self._train_classifier(train_features, train_labels) | |
def _extract_features(self,data): | |
# initialize features with numerical ones | |
features = data[self.NUMERICAL_ATTRIBUTES].values.tolist() | |
print("Getting categorical features...") | |
# transform categorical data | |
cat_features = self._transform_categorical_attributes(data[self.CATEGORICAL_ATTRIBUTES]) | |
# append categorical_features to features | |
features = self._append_features(features, cat_features) | |
print("Getting textual features...") | |
# transform textual data | |
tex_features = self._transform_textual_attributes(data[self.TEXTUAL_ATTRIBUTES]) | |
# append textual_features to features | |
features = self._append_features(features, tex_features) | |
print("Normalizing features...") | |
# transform features | |
features = self._transform_feature_scaler(features) | |
# return features | |
return(features) | |
def predict(self,test_data): | |
# extract features | |
test_features = self._extract_features(test_data) | |
print("Predicting classes...") | |
# predict features | |
return self.classifier.predict(test_features) | |
def predict_proba(self,test_data): | |
# extract features | |
test_features = self._extract_features(test_data) | |
print("Predicting classes...") | |
# predict features | |
return self.classifier.predict_proba(test_features) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import lief | |
import pandas as pd | |
from pe_attribute_extractor import PEAttributeExtractor | |
class NFSWrapper(): | |
def __init__(self, model, threshold = 0.8): | |
# load model | |
self.clf = pickle.load(model) | |
# set threshold | |
self.threshold = threshold | |
def predict(self, bytez: bytes) -> int: | |
try: | |
# initialize attribute extractor | |
pe_att_ext = PEAttributeExtractor(bytez) | |
# extract attributes | |
atts = pe_att_ext.extract() | |
# create dataframe | |
atts = pd.DataFrame([atts]) | |
# predict sample probability | |
prob = self.clf.predict_proba(atts)[0] | |
# get prediction according to gw probability | |
pred = int(prob[0] < self.threshold) | |
# calc probability | |
if pred: | |
# calc normalized mw probality | |
prob[pred] = 0.5 + ((self.threshold-prob[0])/self.threshold)*0.5 | |
else: | |
# calc normalized gw probality | |
prob[pred] = 0.5 + ((prob[0]-self.threshold)/(1-self.threshold))*0.5 | |
except (lief.bad_format, lief.read_out_of_bound) as e: | |
# error parsing PE file, we considere | |
# it's a malware | |
print("Error: ", e) | |
pred = 1 | |
prob = [0, 1] | |
# return prediction and probability | |
return(int(pred), prob[pred]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import math | |
import lief | |
class PEAttributeExtractor(): | |
libraries = "" | |
functions = "" | |
exports = "" | |
# initialize extractor | |
def __init__(self, bytez): | |
# save bytes | |
self.bytez = bytez | |
# parse using lief | |
self.lief_binary = lief.PE.parse(list(bytez)) | |
# attributes | |
self.attributes = {} | |
# extract string metadata | |
def extract_string_metadata(self): | |
# occurances of string 'C:\' | |
paths = re.compile(b'c:\\\\', re.IGNORECASE) | |
# occurances of http:// or https:// | |
urls = re.compile(b'https?://', re.IGNORECASE) | |
# occurances of string prefix HKEY_ | |
registry = re.compile(b'HKEY_') | |
# evidences of MZ header | |
mz = re.compile(b'MZ') | |
return { | |
'string_paths': len(paths.findall(self.bytez)), | |
'string_urls': len(urls.findall(self.bytez)), | |
'string_registry': len(registry.findall(self.bytez)), | |
'string_MZ': len(mz.findall(self.bytez)) | |
} | |
# extract entropy | |
def extract_entropy(self): | |
if not self.bytez: | |
return 0 | |
entropy=0 | |
for x in range(256): | |
p_x = float(self.bytez.count(bytes(x)))/len(self.bytez) | |
if p_x>0: | |
entropy += - p_x*math.log(p_x, 2) | |
return entropy | |
# extract attributes | |
def extract(self): | |
# get general info | |
self.attributes.update({ | |
"size": len(self.bytez), | |
"virtual_size": self.lief_binary.virtual_size, | |
"has_debug": int(self.lief_binary.has_debug), | |
"imports": len(self.lief_binary.imports), | |
"exports": len(self.lief_binary.exported_functions), | |
"has_relocations": int(self.lief_binary.has_relocations), | |
"has_resources": int(self.lief_binary.has_resources), | |
"has_signature": int(self.lief_binary.has_signature), | |
"has_tls": int(self.lief_binary.has_tls), | |
"symbols": len(self.lief_binary.symbols), | |
}) | |
# get header info | |
self.attributes.update({ | |
"timestamp": self.lief_binary.header.time_date_stamps, | |
"machine": str(self.lief_binary.header.machine), | |
"numberof_sections": self.lief_binary.header.numberof_sections, | |
"numberof_symbols": self.lief_binary.header.numberof_symbols, | |
"pointerto_symbol_table": self.lief_binary.header.pointerto_symbol_table, | |
"sizeof_optional_header": self.lief_binary.header.sizeof_optional_header, | |
"characteristics": int(self.lief_binary.header.characteristics), | |
"characteristics_list": " ".join([str(c).replace("HEADER_CHARACTERISTICS.","") for c in self.lief_binary.header.characteristics_list]) | |
}) | |
try: | |
baseof_data = self.lief_binary.optional_header.baseof_data | |
except: | |
baseof_data = 0 | |
# get optional header | |
self.attributes.update({ | |
"baseof_code": self.lief_binary.optional_header.baseof_code, | |
"baseof_data": baseof_data, | |
"dll_characteristics": self.lief_binary.optional_header.dll_characteristics, | |
"dll_characteristics_list": " ".join([str(d).replace("DLL_CHARACTERISTICS.", "") for d in self.lief_binary.optional_header.dll_characteristics_lists]), | |
"file_alignment": self.lief_binary.optional_header.file_alignment, | |
"imagebase": self.lief_binary.optional_header.imagebase, | |
"magic": str(self.lief_binary.optional_header.magic).replace("PE_TYPE.",""), | |
"PE_TYPE": int(self.lief_binary.optional_header.magic), | |
"major_image_version": self.lief_binary.optional_header.major_image_version, | |
"minor_image_version": self.lief_binary.optional_header.minor_image_version, | |
"major_linker_version": self.lief_binary.optional_header.major_linker_version, | |
"minor_linker_version": self.lief_binary.optional_header.minor_linker_version, | |
"major_operating_system_version": self.lief_binary.optional_header.major_operating_system_version, | |
"minor_operating_system_version": self.lief_binary.optional_header.minor_operating_system_version, | |
"major_subsystem_version": self.lief_binary.optional_header.major_subsystem_version, | |
"minor_subsystem_version": self.lief_binary.optional_header.minor_subsystem_version, | |
"numberof_rva_and_size": self.lief_binary.optional_header.numberof_rva_and_size, | |
"sizeof_code": self.lief_binary.optional_header.sizeof_code, | |
"sizeof_headers": self.lief_binary.optional_header.sizeof_headers, | |
"sizeof_heap_commit": self.lief_binary.optional_header.sizeof_heap_commit, | |
"sizeof_image": self.lief_binary.optional_header.sizeof_image, | |
"sizeof_initialized_data": self.lief_binary.optional_header.sizeof_initialized_data, | |
"sizeof_uninitialized_data": self.lief_binary.optional_header.sizeof_uninitialized_data, | |
"subsystem": str(self.lief_binary.optional_header.subsystem).replace("SUBSYSTEM.","") | |
}) | |
# get entropy | |
self.attributes.update({ | |
"entropy": self.extract_entropy() | |
}) | |
# get string metadata | |
self.attributes.update(self.extract_string_metadata()) | |
# get imported libraries and functions | |
if self.lief_binary.has_imports: | |
self.libraries = " ".join([l for l in self.lief_binary.libraries]) | |
self.functions = " ".join([f.name for f in self.lief_binary.imported_functions]) | |
self.attributes.update({"functions": self.functions, "libraries": self.libraries}) | |
# get exports | |
if self.lief_binary.has_exports: | |
self.exports = " ".join([f.name for f in self.lief_binary.exported_functions]) | |
self.attributes.update({"exports_list": self.exports}) | |
return(self.attributes) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from nfs_wrapper import NFSWrapper | |
# initialize classifier with | |
# pre-trained model | |
clf = NFSWrapper(open("nfs.pickle", "rb")) | |
# open test file | |
test_file = open(sys.argv[1],'rb') | |
# get its bytes | |
bytez = test_file.read() | |
# predict pe file | |
pred, prob = clf.predict(bytez) | |
# print probabilities | |
print("Prediction: ", pred) | |
print("Probability: ", prob) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pickle | |
import pandas as pd | |
from json_attribute_extractor import JSONAttributeExtractor | |
from need_for_speed_model import NeedForSpeedModel | |
# list of files used to train the model in | |
# the same format as ember 17' and 18' | |
# datasets | |
files = [ | |
"ember_2017_2/train_features_0.jsonl", | |
"ember_2017_2/train_features_1.jsonl", | |
"ember_2017_2/train_features_2.jsonl", | |
"ember_2017_2/train_features_3.jsonl", | |
"ember_2017_2/train_features_4.jsonl", | |
"ember_2017_2/train_features_5.jsonl", | |
"ember_2017_2/test_features.jsonl", | |
"ember2018/train_features_0.jsonl", | |
"ember2018/train_features_1.jsonl", | |
"ember2018/train_features_2.jsonl", | |
"ember2018/train_features_3.jsonl", | |
"ember2018/train_features_4.jsonl", | |
"ember2018/train_features_5.jsonl", | |
"ember2018/test_features.jsonl" | |
] | |
if __name__=='__main__': | |
train_attributes = [] | |
# walk in files | |
for input in files: | |
# read input file | |
file = open(input, 'r') | |
# read its lines | |
sws = file.readlines() | |
# walk in each sw | |
for sw in sws: | |
# initialize extractor | |
at_extractor = JSONAttributeExtractor(sw) | |
# get train_attributes | |
atts = at_extractor.extract() | |
# save attribute | |
train_attributes.append(atts) | |
# close file | |
file.close() | |
# create pandas dataframe with train attributes | |
train_data = pd.DataFrame(train_attributes) | |
# get train data that have label | |
train_data = train_data[(train_data["label"]==1) | (train_data["label"]==0)] | |
# initialize nfs model | |
clf = NeedForSpeedModel() | |
# train model | |
clf.fit(train_data) | |
# save model | |
with open('nfs.pickle', 'wb') as f: | |
pickle.dump(clf, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment