Skip to content

Instantly share code, notes, and snippets.

@bwall bwall/benchmarking.py Secret
Created Nov 3, 2015

Embed
What would you like to do?
Python script used when collecting benchmark results for ssDeep optimizations
from __future__ import division
import time
import ssdeep
import sqlite3
import datetime
from struct import unpack
import base64
def log(message):
message = "{0}:\t{1}".format(time.time(), message)
print message
with open("results.log", "a") as f:
f.write(message + "\n")
class BenchmarkStrategy:
def __init__(self, name, hash_list_path, database_increment):
self.name = name
self.hash_list_path = hash_list_path
self.database = None
self.database_increment = database_increment
def build_database(self, size):
if self.database is not None:
self.database.close()
self.database = None
self._build_database(size)
def run_benchmarks(self):
benchmark_results = []
log("Starting benchmarks for {0}".format(self.name))
with open(self.hash_list_path, "r") as f:
i = 0
for i, line in enumerate(f):
pass
hash_count = i + 1
log("Starting lookup benchmarks for {0}".format(self.name))
# Do single look up tests
database_sizes = []
database_size = self.database_increment
while database_size < hash_count:
database_sizes.append(database_size)
database_size += self.database_increment
database_sizes.append(hash_count)
# Do single look up tests
for database_size in database_sizes:
for i in xrange(5):
log("Starting lookup benchmarks for {0} with size {1} iteration {2}".format(self.name, database_size, i))
self.build_database(database_size)
start_time = time.time()
self.run_comparisons(False)
end_time = time.time()
benchmark_results.append((self.name, "lookup", database_size, i, end_time - start_time))
database_size += self.database_increment
log("Starting clustering benchmarks for {0}".format(self.name))
for database_size in database_sizes:
for i in xrange(5):
log("Starting clustering benchmarks for {0} with size {1} iteration {2}".format(self.name, database_size, i))
self.build_database(database_size)
start_time = time.time()
self.run_clustering(False)
end_time = time.time()
benchmark_results.append((self.name, "cluster", database_size, i, end_time - start_time))
database_size += self.database_increment
return benchmark_results
def run_test_vectors(self):
"""
Tests the validity of methods by testing against the bare bone method
:return: Boolean indicating if tests passed or not
"""
log("Building database for testing {0}".format(self.name))
self.build_database(500)
plain = PlainMethod(self.hash_list_path, self.database_increment)
plain.build_database(500)
log("Computing comparison test vectors")
plain_comp = set(plain.run_comparisons(True, 500))
log("Computing comparison {0} vectors".format(self.name))
my_comp = set(self.run_comparisons(True, 500))
log("Comparing comparison results")
if plain_comp != my_comp:
log("Comparison results failed for {0}".format(self.name))
log("Missing {0}".format(plain_comp - my_comp))
log("Extra {0}".format(my_comp - plain_comp))
return False
log("Computing cluster test vectors")
plain_comp = set(plain.run_clustering(True))
log("Computing cluster {0} vectors".format(self.name))
my_comp = set(self.run_clustering(True))
log("Comparing cluster results")
if plain_comp != my_comp:
log("Cluster results failed for {0}".format(self.name))
log("Missing {0}".format(plain_comp - my_comp))
log("Extra {0}".format(my_comp - plain_comp))
return False
log("All testing with {0} passed".format(self.name))
return True
def run_comparisons(self, save_results, count=1000):
hashes = []
with open(self.hash_list_path, "r") as f:
for line in f:
line = line.strip()
hashes.append(line)
if len(hashes) >= count:
break
return self._run_comparisons(save_results, hashes)
def run_clustering(self, save_results):
"""
For clustering, we are looking for all comparisons greater than 0
:param save_results:
:return:
"""
return self._run_clustering(save_results)
class PlainMethod(BenchmarkStrategy):
def __init__(self, hash_list_path, database_increment):
BenchmarkStrategy.__init__(self, "plain", hash_list_path, database_increment)
def _build_database(self, size):
self.database = sqlite3.connect(":memory:")
c = self.database.cursor()
c.execute("CREATE TABLE hashes (hash VARCHAR UNIQUE)")
inserted = set()
with open(self.hash_list_path, "r") as f:
for line in f:
line = line.strip()
c.execute("INSERT OR IGNORE INTO hashes VALUES (?)", (line,))
inserted.add(line)
if len(inserted) >= size:
break
c.close()
def _run_comparisons(self, save_results, hashes_to_test):
c = self.database.cursor()
if save_results:
results = []
for h in hashes_to_test:
for line in c.execute("SELECT hash FROM hashes"):
r = ssdeep.compare(h, line[0])
if r > 0:
results.append((h, line[0], r))
c.close()
return results
else:
for h in hashes_to_test:
for line in c.execute("SELECT hash FROM hashes"):
ssdeep.compare(h, line[0])
c.close()
def _run_clustering(self, save_results):
c = self.database.cursor()
c2 = self.database.cursor()
if save_results:
results = []
for line in c.execute("SELECT hash FROM hashes"):
for line2 in c2.execute("SELECT hash FROM hashes"):
r = ssdeep.compare(line[0], line2[0])
if r > 0:
results.append((line[0], line2[0], r))
c.close()
c2.close()
return results
else:
for line in c.execute("SELECT hash FROM hashes"):
for line2 in c2.execute("SELECT hash FROM hashes"):
ssdeep.compare(line[0], line2[0])
c.close()
c2.close()
class ChunkSizeMethod(BenchmarkStrategy):
def __init__(self, hash_list_path, database_increment):
BenchmarkStrategy.__init__(self, "chunksize", hash_list_path, database_increment)
def _build_database(self, size):
self.database = sqlite3.connect(":memory:")
c = self.database.cursor()
c.execute("CREATE TABLE hashes (chunksize INT, hash VARCHAR UNIQUE)")
c.execute('''CREATE INDEX full_chunk_sizes ON hashes (chunksize ASC)''')
inserted = set()
with open(self.hash_list_path, "r") as f:
for line in f:
line = line.strip()
c.execute("INSERT OR IGNORE INTO hashes VALUES (?, ?)", (int(line.split(":")[0]), line))
inserted.add(line)
if len(inserted) >= size:
break
c.close()
def _run_comparisons(self, save_results, hashes_to_test):
c = self.database.cursor()
if save_results:
results = []
for h in hashes_to_test:
chunksize = int(h.split(":")[0])
for line in c.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
r = ssdeep.compare(h, line[0])
if r > 0:
results.append((h, line[0], r))
c.close()
return results
else:
for h in hashes_to_test:
chunksize = int(h.split(":")[0])
for line in c.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
ssdeep.compare(h, line[0])
c.close()
def _run_clustering(self, save_results):
c = self.database.cursor()
c2 = self.database.cursor()
if save_results:
results = []
for line in c.execute("SELECT hash FROM hashes"):
chunksize = int(line[0].split(":")[0])
for line2 in c2.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
r = ssdeep.compare(line[0], line2[0])
if r > 0:
results.append((line[0], line2[0], r))
c.close()
c2.close()
return results
else:
for line in c.execute("SELECT hash FROM hashes"):
chunksize = int(line[0].split(":")[0])
for line2 in c2.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
ssdeep.compare(line[0], line2[0])
c.close()
c2.close()
class ChunkSizeWithClusterOptimizationMethod(BenchmarkStrategy):
def __init__(self, hash_list_path, database_increment):
BenchmarkStrategy.__init__(self, "chunksizewithclusteroptimization", hash_list_path, database_increment)
def _build_database(self, size):
self.database = sqlite3.connect(":memory:")
c = self.database.cursor()
c.execute("CREATE TABLE hashes (chunksize INT, hash VARCHAR UNIQUE)")
c.execute('''CREATE INDEX full_chunk_sizes ON hashes (chunksize ASC)''')
inserted = set()
with open(self.hash_list_path, "r") as f:
for line in f:
line = line.strip()
c.execute("INSERT OR IGNORE INTO hashes VALUES (?, ?)", (int(line.split(":")[0]), line))
inserted.add(line)
if len(inserted) >= size:
break
c.close()
def _run_comparisons(self, save_results, hashes_to_test):
c = self.database.cursor()
if save_results:
results = []
for h in hashes_to_test:
chunksize = int(h.split(":")[0])
for line in c.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
r = ssdeep.compare(h, line[0])
if r > 0:
results.append((h, line[0], r))
c.close()
return results
else:
for h in hashes_to_test:
chunksize = int(h.split(":")[0])
for line in c.execute("SELECT hash FROM hashes WHERE chunksize=? or chunksize=? or chunksize=?", (chunksize, chunksize * 2, chunksize / 2)):
ssdeep.compare(h, line[0])
c.close()
def _run_clustering(self, save_results):
c = self.database.cursor()
c2 = self.database.cursor()
if save_results:
results = []
last_chunk = []
last_chunksize = 0
chunk = []
for chunksize in c.execute("SELECT chunksize FROM hashes GROUP BY chunksize"):
chunksize = chunksize[0]
for line2 in c2.execute("SELECT hash FROM hashes WHERE chunksize=?", (chunksize,)):
chunk.append(line2[0])
for a in chunk:
r = ssdeep.compare(line2[0], a)
if r > 0:
results.append((line2[0], a, r))
results.append((a, line2[0], r))
if last_chunksize * 2 == chunksize:
for a in last_chunk:
r = ssdeep.compare(line2[0], a)
if r > 0:
results.append((line2[0], a, r))
results.append((a, line2[0], r))
last_chunk = chunk
chunk = []
last_chunksize = chunksize
c.close()
c2.close()
return results
else:
last_chunk = []
chunk = []
last_chunksize = 0
for chunksize in c.execute("SELECT chunksize FROM hashes GROUP BY chunksize"):
chunksize = chunksize[0]
for line2 in c2.execute("SELECT hash FROM hashes WHERE chunksize=?", (chunksize,)):
chunk.append(line2[0])
for a in chunk:
ssdeep.compare(line2[0], a)
if last_chunksize * 2 == chunksize:
for a in last_chunk:
ssdeep.compare(line2[0], a)
last_chunk = chunk
chunk = []
last_chunksize = chunksize
c.close()
c2.close()
class IntegerDBMethod(BenchmarkStrategy):
def __init__(self, hash_list_path, database_increment):
BenchmarkStrategy.__init__(self, "integerdb", hash_list_path, database_increment)
@staticmethod
def get_all_7_char_chunks(h):
return set((unpack("<Q", base64.b64decode(h[i:i+7] + "=") + "\x00\x00\x00")[0] for i in xrange(len(h) - 6)))
@staticmethod
def preprocess_hash(h):
block_size, h = h.split(":", 1)
block_size = int(block_size)
# Reduce any sequence of the same char greater than 3 to 3
for c in set(list(h)):
while c * 4 in h:
h = h.replace(c * 4, c * 3)
block_data, double_block_data = h.split(":")
return block_size, IntegerDBMethod.get_all_7_char_chunks(block_data), IntegerDBMethod.get_all_7_char_chunks(double_block_data)
def _build_database(self, size):
self.database = sqlite3.connect(":memory:")
c = self.database.cursor()
c.execute('''CREATE TABLE ssdeep_hashes (hash_id INTEGER PRIMARY KEY, hash VARCHAR UNIQUE)''')
c.execute('''CREATE TABLE full_chunk (hash_id INTEGER, chunk_size INTEGER, chunk INTEGER)''')
c.execute('''CREATE INDEX full_chunk_sizes ON full_chunk (chunk_size ASC, chunk ASC)''')
inserted = set()
with open(self.hash_list_path, "r") as f:
for line in f:
h = line.strip()
p = IntegerDBMethod.preprocess_hash(h)
c.execute("INSERT OR IGNORE INTO ssdeep_hashes (hash) VALUES (?)", (h,))
hash_id = None
for id in c.execute("SELECT hash_id FROM ssdeep_hashes WHERE hash=?", (h,)):
hash_id = id[0]
break
# Insert full chunk
c.executemany("INSERT INTO full_chunk (hash_id, chunk_size, chunk) VALUES (?, ?, ?)",
((hash_id, p[0], x) for x in p[1]))
# Insert double chunk
c.executemany("INSERT INTO full_chunk (hash_id, chunk_size, chunk) VALUES (?, ?, ?)",
((hash_id, p[0] * 2, x) for x in p[2]))
inserted.add(h)
if len(inserted) >= size:
break
c.close()
def _run_comparisons(self, save_results, hashes_to_test):
c = self.database.cursor()
preprocess_hash = IntegerDBMethod.preprocess_hash
if save_results:
results = []
for h in hashes_to_test:
p = preprocess_hash(h)
match_ids = set()
match_ids.add(h)
for chunk in p[1]:
for id in c.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0], chunk)):
match_ids.add(id[0])
# Check double chunk size
for chunk in p[2]:
for id in c.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0] * 2, chunk)):
match_ids.add(id[0])
for i in match_ids:
r = ssdeep.compare(h, i)
if r > 0:
results.append((h, i, r))
c.close()
return results
else:
for h in hashes_to_test:
p = preprocess_hash(h)
match_ids = set()
match_ids.add(h)
for chunk in p[1]:
for id in c.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0], chunk)):
match_ids.add(id[0])
# Check double chunk size
for chunk in p[2]:
for id in c.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0] * 2, chunk)):
match_ids.add(id[0])
for i in match_ids:
ssdeep.compare(h, i)
c.close()
def _run_clustering(self, save_results):
c = self.database.cursor()
c2 = self.database.cursor()
preprocess_hash = IntegerDBMethod.preprocess_hash
if save_results:
results = []
for line in c.execute("SELECT hash FROM ssdeep_hashes"):
h = line[0]
p = preprocess_hash(h)
match_ids = set()
match_ids.add(h)
for chunk in p[1]:
for id in c2.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0], chunk)):
match_ids.add(id[0])
# Check double chunk size
for chunk in p[2]:
for id in c2.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0] * 2, chunk)):
match_ids.add(id[0])
for i in match_ids:
r = ssdeep.compare(h, i)
if r > 0:
results.append((h, i, r))
results.append((i, h, r))
c.close()
c2.close()
return results
else:
for line in c.execute("SELECT hash FROM ssdeep_hashes"):
h = line[0]
p = preprocess_hash(h)
match_ids = set()
match_ids.add(h)
for chunk in p[1]:
for id in c2.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0], chunk)):
match_ids.add(id[0])
# Check double chunk size
for chunk in p[2]:
for id in c2.execute("SELECT S.hash FROM full_chunk as F JOIN ssdeep_hashes AS S ON F.hash_id=S.hash_id WHERE F.chunk_size=? and F.chunk=?", (p[0] * 2, chunk)):
match_ids.add(id[0])
for i in match_ids:
ssdeep.compare(h, i)
c.close()
c2.close()
hash_path = "t"
increment = 1000
methods = []
#methods.append(PlainMethod(hash_path, increment))
#methods.append(ChunkSizeMethod(hash_path, increment))
#methods.append(ChunkSizeWithClusterOptimizationMethod(hash_path, increment))
methods.append(IntegerDBMethod(hash_path, increment))
for method in methods:
if not method.run_test_vectors():
raise Exception()
final_results = []
results = {}
for method in methods:
results[method.name] = {}
for i in method.run_benchmarks():
if i[1] not in results[method.name]:
results[method.name][i[1]] = {}
if i[2] not in results[method.name][i[1]]:
results[method.name][i[1]][i[2]] = []
results[method.name][i[1]][i[2]].append(i[4])
for t in results[method.name]:
for db_size in results[method.name][t].keys():
average_time = sum(results[method.name][t][db_size]) / len(results[method.name][t][db_size])
average_time_per_hash = average_time / db_size
final_results.append((method.name, t, db_size, average_time, average_time_per_hash))
for b in ["lookup", "cluster"]:
for method in methods:
for i in sorted([k for k in final_results if k[0] == method.name and b == k[1]], key=lambda l: l[2]):
log(", ".join([str(s) for s in i]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.