Skip to content

Instantly share code, notes, and snippets.

@fabriciojoc
Last active September 24, 2020 05:30
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save fabriciojoc/ad51389f9af5c9a7c819381e80af2952 to your computer and use it in GitHub Desktop.
MLSEC 2020: Need for Speed Malware Detection Model
import json
class JSONAttributeExtractor():
# initialize extractor
def __init__(self, file):
# save data
self.data = json.loads(file)
# attributes
self.attributes = {}
# extract string metadata
def extract_string_metadata(self):
return {
'string_paths': self.data["strings"]["paths"],
'string_urls': self.data["strings"]["urls"],
'string_registry': self.data["strings"]["registry"],
'string_MZ': self.data["strings"]["MZ"]
}
# extract attributes
def extract(self):
# get general info
self.attributes.update({
"size": self.data["general"]["size"],
"virtual_size": self.data["general"]["vsize"],
"has_debug": self.data["general"]["has_debug"],
"imports": self.data["general"]["imports"],
"exports": self.data["general"]["exports"],
"has_relocations": self.data["general"]["has_relocations"],
"has_resources": self.data["general"]["has_resources"],
"has_signature": self.data["general"]["has_signature"],
"has_tls": self.data["general"]["has_tls"],
"symbols": self.data["general"]["symbols"],
})
# get header info
self.attributes.update({
"timestamp": self.data["header"]["coff"]["timestamp"],
"machine": self.data["header"]["coff"]["machine"],
"numberof_sections": len(self.data["section"]["sections"]),
"characteristics_list": " ".join(self.data["header"]["coff"]["characteristics"])
})
# get optional header
self.attributes.update({
"dll_characteristics_list": " ".join(self.data["header"]["optional"]["dll_characteristics"]),
"magic": self.data["header"]["optional"]["magic"],
"major_image_version": self.data["header"]["optional"]["major_image_version"],
"minor_image_version": self.data["header"]["optional"]["minor_image_version"],
"major_linker_version": self.data["header"]["optional"]["major_linker_version"],
"minor_linker_version": self.data["header"]["optional"]["minor_linker_version"],
"major_operating_system_version": self.data["header"]["optional"]["major_operating_system_version"],
"minor_operating_system_version": self.data["header"]["optional"]["minor_operating_system_version"],
"major_subsystem_version": self.data["header"]["optional"]["major_subsystem_version"],
"minor_subsystem_version": self.data["header"]["optional"]["minor_subsystem_version"],
"sizeof_code": self.data["header"]["optional"]["sizeof_code"],
"sizeof_headers": self.data["header"]["optional"]["sizeof_headers"],
"sizeof_heap_commit": self.data["header"]["optional"]["sizeof_heap_commit"]
})
# get string metadata
self.attributes.update(self.extract_string_metadata())
# get imported libraries and functions
self.libraries = " ".join([item for sublist in self.data["imports"].values() for item in sublist])
self.functions = " ".join(self.data["imports"].keys())
self.attributes.update({"functions": self.functions, "libraries": self.libraries})
# get exports
self.exports = " ".join(self.data["exports"])
self.attributes.update({"exports_list": self.exports})
# get label
self.label = self.data["label"]
self.attributes.update({"label": self.label})
return(self.attributes)
from copy import deepcopy
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
# need for speed class
class NeedForSpeedModel():
# numerical attributes
NUMERICAL_ATTRIBUTES = [
'string_paths', 'string_urls', 'string_registry', 'string_MZ', 'size',
'virtual_size', 'has_debug', 'imports', 'exports', 'has_relocations',
'has_resources', 'has_signature', 'has_tls', 'symbols', 'timestamp',
'numberof_sections', 'major_image_version', 'minor_image_version',
'major_linker_version', 'minor_linker_version', 'major_operating_system_version',
'minor_operating_system_version', 'major_subsystem_version',
'minor_subsystem_version', 'sizeof_code', 'sizeof_headers', 'sizeof_heap_commit'
]
# categorical attributes
CATEGORICAL_ATTRIBUTES = [
'machine', 'magic'
]
# textual attributes
TEXTUAL_ATTRIBUTES = ['libraries', 'functions', 'exports_list',
'dll_characteristics_list', 'characteristics_list']
# label
LABEL = "label"
# initialize NFS classifier
def __init__(self,
categorical_extractor = OneHotEncoder(handle_unknown="ignore"),
textual_extractor = TfidfVectorizer(max_features=300),
feature_scaler = MinMaxScaler(),
classifier = RandomForestClassifier(n_estimators=100)):
self.base_categorical_extractor = categorical_extractor
self.base_textual_extractor = textual_extractor
self.base_feature_scaler = feature_scaler
self.base_classifier = classifier
# append features to original features list
def _append_features(self, original_features, appended):
if original_features:
for l1, l2 in zip(original_features, appended):
for i in l2:
l1.append(i)
return(original_features)
else:
return appended.tolist()
# train a categorical extractor
def _train_categorical_extractor(self, categorical_attributes):
# initialize categorical extractor
self.categorical_extractor = deepcopy(self.base_categorical_extractor)
# train categorical extractor
self.categorical_extractor.fit(categorical_attributes.values)
# transform categorical attributes into features
def _transform_categorical_attributes(self, categorical_attributes):
# transform categorical attributes using categorical extractor
cat_features = self.categorical_extractor.transform(categorical_attributes.values.tolist()).toarray()
# return categorical features
return cat_features.tolist()
# train a textual extractor
def _train_textual_extractor(self, textual_attributes):
# initialize textual extractors
self.textual_extractors = {}
# train feature extractor for each textual attribute
for att in self.TEXTUAL_ATTRIBUTES:
# initialize textual extractors
self.textual_extractors[att] = deepcopy(self.base_textual_extractor)
# train textual extractor
self.textual_extractors[att].fit(textual_attributes[att].values)
# transform textual extractor
def _transform_textual_attributes(self, textual_attributes):
# initialize features
textual_features = None
# extract features from each textual attribute
for att in self.TEXTUAL_ATTRIBUTES:
# train textual extractor
att_features = self.textual_extractors[att].transform(textual_attributes[att].values)
# transform into array (when it is an sparse matrix)
att_features = att_features.toarray()
# append textual features
textual_features = self._append_features(textual_features, att_features)
return textual_features
# train feature scaler
def _train_feature_scaler(self, features):
# initialize feature scaler
self.feature_scaler = deepcopy(self.base_feature_scaler)
# train feature scaler
self.feature_scaler.fit(features)
# transform features using feature scaler
def _transform_feature_scaler(self, features):
return self.feature_scaler.transform(features)
# train classifier
def _train_classifier(self,features,labels):
# initialize classifier
self.classifier = deepcopy(self.base_classifier)
# train feature scaler
self.classifier.fit(features, labels)
# fit classifier using raw input
def fit(self, train_data):
# get labels
train_labels = train_data[self.LABEL]
# delete label column
del train_data[self.LABEL]
# initialize train_features with numerical ones
train_features = train_data[self.NUMERICAL_ATTRIBUTES].values.tolist()
print("Training categorical features...")
# train categorical extractor
self._train_categorical_extractor(train_data[self.CATEGORICAL_ATTRIBUTES])
# transform categorical data
cat_train_features = self._transform_categorical_attributes(train_data[self.CATEGORICAL_ATTRIBUTES])
# append categorical_features to train_features
train_features = self._append_features(train_features, cat_train_features)
print("Training textual features...")
# train textual extractor
self._train_textual_extractor(train_data[self.TEXTUAL_ATTRIBUTES])
# transform textual data
tex_train_features = self._transform_textual_attributes(train_data[self.TEXTUAL_ATTRIBUTES])
# append textual_features to train_features
train_features = self._append_features(train_features, tex_train_features)
print("Normalizing features...")
# train feature normalizer
self._train_feature_scaler(train_features)
# transform features
train_features = self._transform_feature_scaler(train_features)
print("Training classifier...")
# train classifier
return self._train_classifier(train_features, train_labels)
def _extract_features(self,data):
# initialize features with numerical ones
features = data[self.NUMERICAL_ATTRIBUTES].values.tolist()
print("Getting categorical features...")
# transform categorical data
cat_features = self._transform_categorical_attributes(data[self.CATEGORICAL_ATTRIBUTES])
# append categorical_features to features
features = self._append_features(features, cat_features)
print("Getting textual features...")
# transform textual data
tex_features = self._transform_textual_attributes(data[self.TEXTUAL_ATTRIBUTES])
# append textual_features to features
features = self._append_features(features, tex_features)
print("Normalizing features...")
# transform features
features = self._transform_feature_scaler(features)
# return features
return(features)
def predict(self,test_data):
# extract features
test_features = self._extract_features(test_data)
print("Predicting classes...")
# predict features
return self.classifier.predict(test_features)
def predict_proba(self,test_data):
# extract features
test_features = self._extract_features(test_data)
print("Predicting classes...")
# predict features
return self.classifier.predict_proba(test_features)
import pickle
import lief
import pandas as pd
from pe_attribute_extractor import PEAttributeExtractor
class NFSWrapper():
def __init__(self, model, threshold = 0.8):
# load model
self.clf = pickle.load(model)
# set threshold
self.threshold = threshold
def predict(self, bytez: bytes) -> int:
try:
# initialize attribute extractor
pe_att_ext = PEAttributeExtractor(bytez)
# extract attributes
atts = pe_att_ext.extract()
# create dataframe
atts = pd.DataFrame([atts])
# predict sample probability
prob = self.clf.predict_proba(atts)[0]
# get prediction according to gw probability
pred = int(prob[0] < self.threshold)
# calc probability
if pred:
# calc normalized mw probality
prob[pred] = 0.5 + ((self.threshold-prob[0])/self.threshold)*0.5
else:
# calc normalized gw probality
prob[pred] = 0.5 + ((prob[0]-self.threshold)/(1-self.threshold))*0.5
except (lief.bad_format, lief.read_out_of_bound) as e:
# error parsing PE file, we considere
# it's a malware
print("Error: ", e)
pred = 1
prob = [0, 1]
# return prediction and probability
return(int(pred), prob[pred])
import re
import math
import lief
class PEAttributeExtractor():
libraries = ""
functions = ""
exports = ""
# initialize extractor
def __init__(self, bytez):
# save bytes
self.bytez = bytez
# parse using lief
self.lief_binary = lief.PE.parse(list(bytez))
# attributes
self.attributes = {}
# extract string metadata
def extract_string_metadata(self):
# occurances of string 'C:\'
paths = re.compile(b'c:\\\\', re.IGNORECASE)
# occurances of http:// or https://
urls = re.compile(b'https?://', re.IGNORECASE)
# occurances of string prefix HKEY_
registry = re.compile(b'HKEY_')
# evidences of MZ header
mz = re.compile(b'MZ')
return {
'string_paths': len(paths.findall(self.bytez)),
'string_urls': len(urls.findall(self.bytez)),
'string_registry': len(registry.findall(self.bytez)),
'string_MZ': len(mz.findall(self.bytez))
}
# extract entropy
def extract_entropy(self):
if not self.bytez:
return 0
entropy=0
for x in range(256):
p_x = float(self.bytez.count(bytes(x)))/len(self.bytez)
if p_x>0:
entropy += - p_x*math.log(p_x, 2)
return entropy
# extract attributes
def extract(self):
# get general info
self.attributes.update({
"size": len(self.bytez),
"virtual_size": self.lief_binary.virtual_size,
"has_debug": int(self.lief_binary.has_debug),
"imports": len(self.lief_binary.imports),
"exports": len(self.lief_binary.exported_functions),
"has_relocations": int(self.lief_binary.has_relocations),
"has_resources": int(self.lief_binary.has_resources),
"has_signature": int(self.lief_binary.has_signature),
"has_tls": int(self.lief_binary.has_tls),
"symbols": len(self.lief_binary.symbols),
})
# get header info
self.attributes.update({
"timestamp": self.lief_binary.header.time_date_stamps,
"machine": str(self.lief_binary.header.machine),
"numberof_sections": self.lief_binary.header.numberof_sections,
"numberof_symbols": self.lief_binary.header.numberof_symbols,
"pointerto_symbol_table": self.lief_binary.header.pointerto_symbol_table,
"sizeof_optional_header": self.lief_binary.header.sizeof_optional_header,
"characteristics": int(self.lief_binary.header.characteristics),
"characteristics_list": " ".join([str(c).replace("HEADER_CHARACTERISTICS.","") for c in self.lief_binary.header.characteristics_list])
})
try:
baseof_data = self.lief_binary.optional_header.baseof_data
except:
baseof_data = 0
# get optional header
self.attributes.update({
"baseof_code": self.lief_binary.optional_header.baseof_code,
"baseof_data": baseof_data,
"dll_characteristics": self.lief_binary.optional_header.dll_characteristics,
"dll_characteristics_list": " ".join([str(d).replace("DLL_CHARACTERISTICS.", "") for d in self.lief_binary.optional_header.dll_characteristics_lists]),
"file_alignment": self.lief_binary.optional_header.file_alignment,
"imagebase": self.lief_binary.optional_header.imagebase,
"magic": str(self.lief_binary.optional_header.magic).replace("PE_TYPE.",""),
"PE_TYPE": int(self.lief_binary.optional_header.magic),
"major_image_version": self.lief_binary.optional_header.major_image_version,
"minor_image_version": self.lief_binary.optional_header.minor_image_version,
"major_linker_version": self.lief_binary.optional_header.major_linker_version,
"minor_linker_version": self.lief_binary.optional_header.minor_linker_version,
"major_operating_system_version": self.lief_binary.optional_header.major_operating_system_version,
"minor_operating_system_version": self.lief_binary.optional_header.minor_operating_system_version,
"major_subsystem_version": self.lief_binary.optional_header.major_subsystem_version,
"minor_subsystem_version": self.lief_binary.optional_header.minor_subsystem_version,
"numberof_rva_and_size": self.lief_binary.optional_header.numberof_rva_and_size,
"sizeof_code": self.lief_binary.optional_header.sizeof_code,
"sizeof_headers": self.lief_binary.optional_header.sizeof_headers,
"sizeof_heap_commit": self.lief_binary.optional_header.sizeof_heap_commit,
"sizeof_image": self.lief_binary.optional_header.sizeof_image,
"sizeof_initialized_data": self.lief_binary.optional_header.sizeof_initialized_data,
"sizeof_uninitialized_data": self.lief_binary.optional_header.sizeof_uninitialized_data,
"subsystem": str(self.lief_binary.optional_header.subsystem).replace("SUBSYSTEM.","")
})
# get entropy
self.attributes.update({
"entropy": self.extract_entropy()
})
# get string metadata
self.attributes.update(self.extract_string_metadata())
# get imported libraries and functions
if self.lief_binary.has_imports:
self.libraries = " ".join([l for l in self.lief_binary.libraries])
self.functions = " ".join([f.name for f in self.lief_binary.imported_functions])
self.attributes.update({"functions": self.functions, "libraries": self.libraries})
# get exports
if self.lief_binary.has_exports:
self.exports = " ".join([f.name for f in self.lief_binary.exported_functions])
self.attributes.update({"exports_list": self.exports})
return(self.attributes)
import sys
from nfs_wrapper import NFSWrapper
# initialize classifier with
# pre-trained model
clf = NFSWrapper(open("nfs.pickle", "rb"))
# open test file
test_file = open(sys.argv[1],'rb')
# get its bytes
bytez = test_file.read()
# predict pe file
pred, prob = clf.predict(bytez)
# print probabilities
print("Prediction: ", pred)
print("Probability: ", prob)
import json
import pickle
import pandas as pd
from json_attribute_extractor import JSONAttributeExtractor
from need_for_speed_model import NeedForSpeedModel
# list of files used to train the model in
# the same format as ember 17' and 18'
# datasets
files = [
"ember_2017_2/train_features_0.jsonl",
"ember_2017_2/train_features_1.jsonl",
"ember_2017_2/train_features_2.jsonl",
"ember_2017_2/train_features_3.jsonl",
"ember_2017_2/train_features_4.jsonl",
"ember_2017_2/train_features_5.jsonl",
"ember_2017_2/test_features.jsonl",
"ember2018/train_features_0.jsonl",
"ember2018/train_features_1.jsonl",
"ember2018/train_features_2.jsonl",
"ember2018/train_features_3.jsonl",
"ember2018/train_features_4.jsonl",
"ember2018/train_features_5.jsonl",
"ember2018/test_features.jsonl"
]
if __name__=='__main__':
train_attributes = []
# walk in files
for input in files:
# read input file
file = open(input, 'r')
# read its lines
sws = file.readlines()
# walk in each sw
for sw in sws:
# initialize extractor
at_extractor = JSONAttributeExtractor(sw)
# get train_attributes
atts = at_extractor.extract()
# save attribute
train_attributes.append(atts)
# close file
file.close()
# create pandas dataframe with train attributes
train_data = pd.DataFrame(train_attributes)
# get train data that have label
train_data = train_data[(train_data["label"]==1) | (train_data["label"]==0)]
# initialize nfs model
clf = NeedForSpeedModel()
# train model
clf.fit(train_data)
# save model
with open('nfs.pickle', 'wb') as f:
pickle.dump(clf, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment