from sklearn.feature_extraction import DictVectorizer
from tqdm import tqdm_notebook
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.utils import shuffle
import glob
import os
from msbase.utils import load_json
from matplotlib.pyplot import cm
import pandas as pd
import numpy as np
import json
def load_vectors(vectors_dir: str, labels):
DX = []
DY = []
DZ = []
DAPKs = []
for i, label in enumerate(labels):
vectors = json.load(open(vectors_dir + '/' + label + "-vectors.json", "r"))
DAPKs += [ apk for apk, v in vectors ]
DX += [ v for apk, v in vectors ]
DY += [i] * len(vectors)
if label == "benign":
DZ += [0] * len(vectors)
DZ += [1] * len(vectors)
if isinstance(DX[0], dict):
v = DictVectorizer(sparse=False)
DX = v.fit_transform(DX)
feature_names = v.feature_names_
DX = np.array(DX)
return DX, np.array(DY), DZ, feature_names, DAPKs
def classify_fold(train_X, train_Y, test_X, test_Y,
labels, feature_names,
n_estimators, max_features, max_depth, report=False):
classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6, random_state=33), train_Y)
pred_Y = classifier.predict(test_X)
pred_proba_Y = classifier.predict_proba(test_X)
for i in range(pred_proba_Y.shape[1], len(labels)):
pred_proba_Y = np.insert(pred_proba_Y, i, 0, axis=1)
if report:
feature_importances = pd.DataFrame(classifier.feature_importances_,
index = feature_names,
# precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + \
return (None, None, None, None, feature_importances, test_Y, pred_Y, pred_proba_Y)
return classifier.score(test_X, test_Y)
def classify(DX, DY, labels, feature_names, DAPKs, n_estimators, max_features, split_ratio, max_depth, report=False):
X, Y, APKs = shuffle(DX, DY, DAPKs)
classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6)
train_size = int(len(Y) * split_ratio)
train_X = X[:train_size]
train_Y = Y[:train_size], train_Y)
test_X = X[train_size:]
test_Y = Y[train_size:]
test_APKs = APKs[train_size:]
pred_Y = classifier.predict(test_X)
pred_proba_Y = classifier.predict_proba(test_X)
if report:
feature_importances = pd.DataFrame(classifier.feature_importances_,
index = feature_names,
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + (feature_importances, test_Y, pred_Y, pred_proba_Y, test_APKs)
return classifier.score(test_X, test_Y)
def classify_knn(DX, DY, labels, split_ratio=0.7, n_neighbors=3, report=False):
X, Y = shuffle(DX, DY)
classifier = KNeighborsClassifier(n_neighbors=n_neighbors)
train_size = int(len(Y) * split_ratio)
train_X = X[:train_size]
train_Y = Y[:train_size], train_Y)
test_X = X[train_size:]
test_Y = Y[train_size:]
pred_Y = classifier.predict(test_X)
if report:
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels)
return classifier.score(test_X, test_Y)
def classify_svm(DX, DY, labels, kernel="rbf", split_ratio=0.7, report=False):
X, Y = shuffle(DX, DY)
classifier = svm.SVC(kernel=kernel)
train_size = int(len(Y) * split_ratio)
train_X = X[:train_size]
train_Y = Y[:train_size], train_Y)
test_X = X[train_size:]
test_Y = Y[train_size:]
pred_Y = classifier.predict(test_X)
if report:
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels), classifier
return classifier.score(test_X, test_Y)
def matrix(DX, DY, labels):
_, n_feats = DX.shape
estimate_scores = {}
for n_estimators in [2, 20, 60, 80, 100, 160, 200]:
if n_estimators > n_feats:
estimate_scores[n_estimators] = {}
for max_features in [2, 20, 60, 80, 100, 160, 200]:
if max_features > n_feats:
scores = []
for i in range(10):
scores.append(classify(DX, DY, labels=labels, n_estimators=n_estimators,
max_features=max_features, split_ratio=0.7))
score = np.mean(scores)
estimate_scores[n_estimators][max_features] = score
color= cm.rainbow(np.linspace(0, 1, len(estimate_scores)))
n_estimators_map = dict(zip(estimate_scores.keys(), range(len(estimate_scores))))
for n_estimators, scores in estimate_scores.items():
xs, ys = zip(*scores.items())
plt.plot(xs, ys, c=color[n_estimators_map[n_estimators]], label=str(n_estimators))
def avg_eval(DX, DY, DAPKs, combined_labels, combined_labels_index, feature_names,
max_features, n_estimators, n_fold, max_depth):
feature_importances_s = []
y_true_all = []
y_pred_all = []
y_pred_proba_all = []
APKs_test_all = []
kf = KFold(n_splits=n_fold, shuffle=True, random_state=36)
for train_index, test_index in kf.split(DX):
_, _, _, _, feature_importances, y_true, y_pred, y_pred_proba = \
classify_fold(DX[train_index], DY[train_index], DX[test_index], DY[test_index],
combined_labels_index, feature_names,
max_features=max_features, n_estimators=n_estimators,
APKs_test_all += list(pd.DataFrame(DAPKs, columns=["APK"]).loc[test_index]["APK"])
return None, feature_importances_s, \
np.concatenate(y_true_all), np.concatenate(y_pred_all), \
np.concatenate(y_pred_proba_all), APKs_test_all
# NOTE: Kmean isn't very good
# kmeans = KMeans(n_clusters=len(combined_labels))
# y_pred = kmeans.fit_predict(DX)
# mat = confusion_matrix(DY, y_pred).T
# mat
# size_array = np.array([n for l, n in label_stat])
# size_array
# mat = (mat / size_array)
# sn.heatmap(mat,
# xticklabels=labels,
# yticklabels=range(len(labels)))
# plt.xlabel('true label')
# plt.ylabel('predicted label')
def load_vt_stat(apks):
metadata_paths = []
#if not gapps_only:
# metadata_paths.extend(glob.glob("all_samples_eval/*.test.json"))
#label_samples = {}
#bin_samples = { True: [], False: [] } # is_benign
#vt_stat = {}
#vt_stat_bin = {}
#for metadata_path in metadata_paths:
# testset_json = load_json(metadata_path)
#if gapps_only:
# label = os.path.basename(metadata_path).split(".")[0]
# for test_data in testset_json:
#if not gapps_only:
# label = test_data['label']
# if label not in label_samples:
# label_samples[label] = []
# bin_label = test_data['label'] == "benign"
# label_samples[label].append(test_data)
apks_is_malicious = {}
apks_is_malicious_major = {}
for metadata_path in metadata_paths:
for test_data in load_json(metadata_path):
if test_data["apk"] in apks:
assert "virustotal" in test_data, test_data['apk']
vt_report = test_data["virustotal"]
assert "positives" in vt_report and "scans" in vt_report
#vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"])
#print(vt_report["positives"], len(vt_report["scans"]))
assert len(vt_report["scans"]) == vt_report["total"]
vt_major = vt_report["positives"] > int(len(vt_report["scans"]) * 0.5)
vt_exist = vt_report["positives"] >= 1
apks_is_malicious[test_data['apk']] = vt_exist
apks_is_malicious_major[test_data['apk']] = vt_major
ret = [ int(apks_is_malicious[apk]) for apk in apks ]
ret_major = [ int(apks_is_malicious_major[apk]) for apk in apks ]
assert len(ret) == len(apks)
return ret, ret_major
# vt_total += 1
# if vt_total > 0:
# vt_stat[label] = {
# "vt_frac": vt_frac_positives / vt_total,
# "vt_exist": vt_exist_positives / vt_total,
# "vt_major": vt_major_positives / vt_total,
# "vt_support": vt_total,
# }
# for label, samples in bin_samples.items():
# vt_frac_positives = 0
# vt_exist_positives = 0
# vt_major_positives = 0
# vt_total = 0
# for test_data in samples:
# if "virustotal" in test_data:
# vt_report = test_data["virustotal"]
# if "positives" in vt_report and "scans" in vt_report:
# vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"])
# vt_exist_positives += int(vt_report["positives"] > 1)
# vt_major_positives += int(vt_report["positives"] > len(vt_report["scans"]) * 0.5)
# vt_total += 1
# if vt_total > 0:
# vt_stat_bin[label] = {
# "vt_frac": vt_frac_positives / vt_total,
# "vt_exist": vt_exist_positives / vt_total,
# "vt_major": vt_major_positives / vt_total,
# "vt_support": vt_total,
# }
# vt_result_df = pd.DataFrame(vt_stat).T
# vt_stat_bin_df = pd.DataFrame(vt_stat_bin).T
# vt_stat_bin_df = vt_stat_bin_df.rename(index={True: "benign", False: "malicious"}).drop(["vt_frac", "vt_major"], axis=1)
# return vt_result_df, vt_stat_bin_df
# FIXME: PCA is not good
# from sklearn.decomposition import PCA
# from mpl_toolkits.mplot3d import Axes3D
# pca = PCA(n_components=3)
# pca_2 = PCA(n_components=2)
# components = pca.fit_transform(DX)
# components_2 = pca_2.fit_transform(DX)
# result = pd.DataFrame(components, columns=['PCA%i' % i for i in range(3)])
# print(result.shape)
# result_2 = pd.DataFrame(components_2, columns=['PCA%i' % i for i in range(2)])
# print(result_2.shape)
# def plot(color_map, DY, labels):
# colors = [color_map[y] for y in DY]
## Plot initialisation
# fig = plt.figure(figsize=(8, 6))
# ax = Axes3D(fig)
# ax.scatter(result['PCA0'], result['PCA1'], result['PCA2'], c=colors, cmap="Set2_r", s=60)
## make simple, bare axis lines through space:
# xAxisLine = ((min(result['PCA0']), max(result['PCA0'])), (0, 0), (0,0))
# ax.plot(xAxisLine[0], xAxisLine[1], xAxisLine[2], 'r')
# yAxisLine = ((0, 0), (min(result['PCA1']), max(result['PCA1'])), (0,0))
# ax.plot(yAxisLine[0], yAxisLine[1], yAxisLine[2], 'r')
# zAxisLine = ((0, 0), (0,0), (min(result['PCA2']), max(result['PCA2'])))
# ax.plot(zAxisLine[0], zAxisLine[1], zAxisLine[2], 'r')
## label the axes
# ax.set_xlabel("PC1")
# ax.set_ylabel("PC2")
# ax.set_zlabel("PC3")
# markers = [plt.Line2D([0,0],[0,0], color=color, marker='o', linestyle='') for color in color_map]
# plt.legend(markers, labels, numpoints=1)
# color_map = cm.rainbow(np.linspace(0, 1, len(combined_labels)))
# plot(color_map, DY, combined_labels)
# color_map_2 = cm.rainbow(np.linspace(0, 1, 2))
# plot(color_map_2, DZ, ["benign", "malicous"])
# DX_pca = result
# DX_pca.shape
# results, classifier = classify_svm(result_2, DZ, labels=[0,1], report=True)
# results
# plt.figure(1, figsize=(4, 3))
# colors_2 = [color_map_2[y] for y in DZ]
# plt.scatter(result_2['PCA0'], result['PCA1'], c=colors_2, zorder=10,,
# edgecolors='k')
# plt.scatter(classifier.support_vectors_[:, 0], classifier.support_vectors_[:, 1], s=80,
# facecolors='none', zorder=10, edgecolors='k')
# plt.axis('tight')
# x_min = -4
# x_max = 4
# y_min = -4
# y_max = 4
# XX, YY = np.mgrid[x_min:x_max:200j, y_min:y_max:200j]
# Z = classifier.decision_function(np.c_[XX.ravel(), YY.ravel()])
# Put the result into a color plot
# Z = Z.reshape(XX.shape)
# plt.figure(1, figsize=(4, 3))
# plt.pcolormesh(XX, YY, Z > 0,
# plt.contour(XX, YY, Z, colors=['k', 'k', 'k'], linestyles=['--', '-', '--'],
# levels=[-1, -.5, 0, .5, 1])
# plt.xlim(x_min, x_max)
# plt.ylim(y_min, y_max)
# plt.xticks(())
# plt.yticks(())
