|
import os |
|
import json |
|
import numpy as np |
|
import tlsh # from `py-tlsh` |
|
from joblib import Parallel, delayed, parallel_backend |
|
from sklearn.metrics import pairwise_distances_chunked |
|
from tqdm import tqdm |
|
|
|
from vpt import VantagePointTree |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# demo build hashes function |
|
|
|
|
|
def build_hashes(files): |
|
tlsh_lookup = dict() |
|
tlsh_hashers = list() |
|
duplicates = dict() |
|
failures = list() |
|
|
|
for file in files: |
|
hasher = tlsh.Tlsh() |
|
with open(file, "rb") as fp: |
|
for line in fp: |
|
hasher.update(line) |
|
try: |
|
hasher.final() |
|
except ValueError: |
|
pass |
|
|
|
if not hasher.is_valid: |
|
failures.append(file) |
|
continue |
|
|
|
digest = hasher.hexdigest() |
|
if digest in tlsh_lookup: |
|
entry = tlsh_lookup[digest] |
|
if digest not in duplicates: |
|
duplicates[digest] = [entry] |
|
duplicates[digest].append(file) |
|
continue |
|
|
|
tlsh_lookup[digest] = file |
|
tlsh_hashers.append(hasher) |
|
|
|
return tlsh_lookup, tlsh_hashers, duplicates, failures |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# sklearn `pairwise_distances_chunked` brute-force distance calculation |
|
# can be parallel with joblib backend |
|
|
|
|
|
def find_near_duplicates(tlsh_hashers, max_dist=100): |
|
Xlen = len(tlsh_hashers) |
|
X = np.arange(Xlen)[:, np.newaxis] |
|
|
|
def tlsh_sim(row, col): |
|
row = int(row) |
|
col = int(col) |
|
|
|
# skip computation if left-lower half or same |
|
if row == col: |
|
return 0 |
|
# print(row, col) # DEBUG |
|
# add this check for slice computation |
|
# (if whole pairwise matrix can be computed, sklearn assumes symmetry?) |
|
if row > col: |
|
return -1 |
|
|
|
# return (np.arange(len(tlsh_hashers)) + 1)[row] + ((np.arange(len(tlsh_hashers)) + 1) / 10)[col] # DEBUG |
|
|
|
h_row = tlsh_hashers[row] |
|
h_col = tlsh_hashers[col] |
|
return h_row.diff(h_col) |
|
|
|
# memory for a single row (instead of warning for `=0`) |
|
# 1 MB --> 1 B --> int8 (8 byte?) * row-length |
|
working_memory = 1 / 2**20 * 8 * Xlen |
|
# numbers of rows per chunk |
|
num_rows = 1 |
|
working_memory *= num_rows |
|
|
|
# max_dist: maximum distance (if None or negative, compute all) |
|
|
|
# reducer function (to only return col indexes per row that are below max distance) |
|
def reduce_func(D_chunk, start_row): |
|
# filter at least positive, less than max_dist and not same |
|
idxs = [ |
|
np.flatnonzero((d >= 0) & (d <= max_dist) & (np.arange(Xlen) != pos)) |
|
for pos, d in enumerate(D_chunk, start_row) |
|
] |
|
vals = [d[i] for d, i in zip(D_chunk, idxs)] |
|
return idxs, vals |
|
|
|
# NOTE: if whole NxN matrix fits into working_memory, sklearn assumes symmetry and only computes top-right half and mirrors it on -1 |
|
pwdst_iter = pairwise_distances_chunked( |
|
X, |
|
X, |
|
metric=tlsh_sim, |
|
reduce_func=reduce_func, |
|
n_jobs=-1, |
|
working_memory=working_memory, |
|
) |
|
|
|
below_max_dist_idxs = [] |
|
below_max_dist_vals = [] |
|
|
|
for chunk_idxs, chunk_vals in tqdm( |
|
pwdst_iter, total=round(Xlen / num_rows), desc="Filter max_dist" |
|
): |
|
below_max_dist_idxs.extend(chunk_idxs) |
|
below_max_dist_vals.extend(chunk_vals) |
|
|
|
below_max_dist_idxs = [ |
|
(row, col) for row, cols in enumerate(below_max_dist_idxs) for col in cols |
|
] |
|
below_max_dist_vals = np.concatenate(below_max_dist_vals) |
|
|
|
return below_max_dist_idxs, below_max_dist_vals |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# iterator to build hashes matrix (will compute it for every pair row, no shortcuts) |
|
|
|
|
|
def compute_near_duplicates_matrix_iter(tlsh_hashers): |
|
X = np.arange(len(tlsh_hashers))[:, np.newaxis] |
|
|
|
def tlsh_sim(row, col): |
|
row = int(row) |
|
col = int(col) |
|
|
|
# skip computation if left-lower half or same |
|
if row == col: |
|
return 0 |
|
# add this check for slice computation |
|
# (if whole pairwise matrix can be computed, sklearn assumes symmetry?) |
|
if row > col: |
|
return -1 |
|
|
|
h_row = tlsh_hashers[row] |
|
h_col = tlsh_hashers[col] |
|
return h_row.diff(h_col) |
|
|
|
# memory for a single row (instead of warning for `=0`) |
|
# 1 MB --> 1 B --> int8 (8 byte?) * row-length |
|
working_memory = 1 / 2**20 * 8 * len(tlsh_hashers) |
|
# numbers of rows per chunk |
|
num_rows = 1 |
|
working_memory *= num_rows |
|
|
|
# max_dist: maximum distance (if None or negative, compute all) |
|
|
|
# NOTE: if whole NxN matrix fits into working_memory, sklearn assumes symmetry and only computes top-right half and mirrors it on -1 |
|
pwdst_iter = pairwise_distances_chunked( |
|
X, |
|
X, |
|
metric=tlsh_sim, |
|
reduce_func=None, |
|
n_jobs=-1, |
|
working_memory=working_memory, |
|
) |
|
|
|
yield from pwdst_iter |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# parallel search (using global globaltree VPT tree instance) |
|
|
|
|
|
def _find_nearest_vpt_one(digest, i, max_dist): |
|
global globaltree |
|
|
|
tlsh_hash = tlsh.Tlsh() |
|
tlsh_hash.fromTlshStr(digest) |
|
result = globaltree.search(tlsh_hash) |
|
|
|
# no result |
|
if not result: |
|
return None |
|
# to distant |
|
dist, idx, th_other = result |
|
if dist > max_dist: |
|
return None |
|
|
|
# precompute for later |
|
result_key = tuple([dist] + sorted([i, idx])) |
|
|
|
drow = tlsh_hash.hexdigest() |
|
dcol = th_other.hexdigest() |
|
|
|
return result_key, (drow, dcol) |
|
|
|
|
|
def find_nearest_with_VPT_parallel(tlsh_hashers, tlsh_lookup, tree, max_dist=100): |
|
global globaltree |
|
globaltree = tree |
|
|
|
results = dict() |
|
seen = set() |
|
|
|
tlsh_hashers = tqdm(tlsh_hashers, desc="Search for most similar") |
|
# Parallel(return_generator=True) ? |
|
# n_jobs=-3 here will overwrite parallel_backend() settings |
|
for result in Parallel(prefer="processes")( |
|
delayed(_find_nearest_vpt_one)(th.hexdigest(), i, max_dist) |
|
for i, th in enumerate(tlsh_hashers) |
|
): |
|
if not result: |
|
continue |
|
|
|
result_key, (drow, dcol) = result |
|
|
|
dist = result_key[0] |
|
|
|
if result_key in seen: |
|
continue |
|
if dist not in results: |
|
results[dist] = list() |
|
|
|
erow = tlsh_lookup[drow] |
|
ecol = tlsh_lookup[dcol] |
|
results[dist].append(((drow, dcol), (erow, ecol))) |
|
seen.add(result_key) |
|
|
|
del globaltree |
|
|
|
return results |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# non-parallel variant of `find_nearest_with_VPT_parallel` |
|
|
|
|
|
def find_nearest_with_VPT(tlsh_hashers, tlsh_lookup, tree, max_dist=100): |
|
results = dict() |
|
seen = set() |
|
for i, th in enumerate(tqdm(tlsh_hashers, desc="Search for most similar")): |
|
result = tree.search(th) |
|
if not result: |
|
continue |
|
|
|
dist, idx, th_other = result |
|
if dist > max_dist: |
|
continue |
|
|
|
result_key = tuple([dist] + sorted([i, idx])) |
|
if result_key in seen: |
|
continue |
|
|
|
if dist not in results: |
|
results[dist] = list() |
|
|
|
drow = tlsh_hashers[i].hexdigest() |
|
dcol = tlsh_hashers[idx].hexdigest() |
|
erow = tlsh_lookup[drow] # file |
|
ecol = tlsh_lookup[dcol] |
|
results[dist].append(((drow, dcol), (erow, ecol))) |
|
seen.add(result_key) |
|
|
|
return results |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# json de/encoder for serializing binary strings |
|
|
|
|
|
class ByteJSONEncoder(json.JSONEncoder): |
|
def default(self, o): |
|
if isinstance(o, bytes): |
|
return o.decode() |
|
return o |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
# caching of TLSH hashes |
|
|
|
|
|
def _load_tlsh_file(filename, encoding="utf-8"): |
|
tlsh_lookup = dict() |
|
tlsh_hashers = list() |
|
|
|
with open(filename, "rb") as fp: |
|
for line in tqdm(fp, desc="Load TLSH hashes"): |
|
digest, sourcefile = line.rstrip().split(b"\t") |
|
digest = digest.decode(encoding) |
|
sourcefile = sourcefile.decode(encoding) |
|
tlsh_lookup[digest] = sourcefile |
|
|
|
# regenerate TLSH hash object |
|
tlsh_hash = tlsh.Tlsh() |
|
tlsh_hash.fromTlshStr(digest) |
|
tlsh_hashers.append(tlsh_hash) |
|
|
|
return tlsh_lookup, tlsh_hashers |
|
|
|
|
|
def _save_tlsh_file(filename, tlsh_lookup, encoding="utf-8"): |
|
if not tlsh_lookup: |
|
return |
|
|
|
with open(filename, "wb") as fp: |
|
for digest, sourcefile in tlsh_lookup.items(): |
|
fp.write(digest.encode(encoding)) |
|
fp.write(b"\t") |
|
fp.write(sourcefile.encode(encoding)) |
|
fp.write(b"\n") |
|
|
|
|
|
# ---------------------------------------------------------------------------- |
|
|
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
|
|
# fmt: off |
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
|
parser.add_argument("file_or_folder", type=str, help="Input file or folder with source files") |
|
parser.add_argument("-t", "--tlshfile", dest="tlshfilename", type=str, help="Input/Output file for TLSH hashes") |
|
parser.add_argument("--errorfile", dest="errorfilename", type=str, help="Output file for errors") |
|
parser.add_argument("--dupsfile", dest="duplicatefilename", type=str, help="Output file for duplicates (same TLSH)") |
|
|
|
subparsers = parser.add_subparsers(dest="action", required=True, help="Actions") |
|
|
|
parser_maxdist = subparsers.add_parser("maxdist", help="Compute TLSH distances below 'max_dist'") |
|
parser_matrix = subparsers.add_parser("matrix", help="Compute full NxN TLSH distance matrix") |
|
parser_mostsimilar = subparsers.add_parser("mostsimilar", help="Compute most similar TLSH document pairs") |
|
parser_hash = subparsers.add_parser("hash", help="Precompute TLSH hashes only (requires -t/--tlshfile to be set)") |
|
|
|
parser_maxdist.add_argument("-d", "--maxdist", dest="max_dist", default=100, type=int) |
|
parser_maxdist.add_argument("-r", "--resultfile", dest="resultfilename", default="result.json", type=str) |
|
parser_matrix.add_argument("-r", "--resultfile", dest="resultfilename", default="result.txt", type=str) |
|
parser_mostsimilar.add_argument("-r", "--resultfile", dest="resultfilename", default="result.json", type=str) |
|
parser_mostsimilar.add_argument("-d", "--maxdist", dest="max_dist", default=100, type=int) |
|
# fmt: on |
|
|
|
args = parser.parse_args() |
|
|
|
if not os.path.exists(args.file_or_folder): |
|
raise FileNotFoundError(args.file_or_folder) |
|
|
|
if args.action == "hash" and not args.tlshfilename: |
|
parser.error("Action 'hash' requires '-t/--tlshfile' to be set!") |
|
|
|
if args.action == "maxdist" and args.max_dist < 0: |
|
parser.error( |
|
"Action 'maxdist' requires '-d/--maxdist' to be at least 0 or larger!" |
|
) |
|
|
|
# ------------------------------------------ |
|
|
|
if ( |
|
args.action != "hash" |
|
and args.tlshfilename |
|
and os.path.exists(args.tlshfilename) |
|
): |
|
print("[!] Found existing TLSH hash file. Loading pre-computed hashes ...") |
|
|
|
tlsh_lookup, tlsh_hashers = _load_tlsh_file(args.tlshfilename) |
|
|
|
print("[*] Number of hashes: {}".format(len(tlsh_lookup))) |
|
|
|
else: |
|
files = [args.file_or_folder] |
|
if os.path.isdir(args.file_or_folder): |
|
files = [os.path.join(args.file_or_folder, f) for f in os.listdir(args.file_or_folder)] |
|
tlsh_lookup, tlsh_hashers, duplicates, failures = build_hashes(files) |
|
|
|
print("[*] Number of hashes: {}".format(len(tlsh_lookup))) |
|
print( |
|
"[*] Number of duplicates: {} ({} total)".format( |
|
len(duplicates), sum(map(len, duplicates.values())) |
|
) |
|
) |
|
print("[*] Number of failures (for TLSH hash computation): {}".format(len(failures))) # fmt: skip |
|
|
|
if args.tlshfilename: |
|
_save_tlsh_file(args.tlshfilename, tlsh_lookup) |
|
|
|
print("[*] TLSH hashes stored in '{}'".format(args.tlshfilename)) |
|
|
|
if args.errorfilename: |
|
# store failures as [entry1..N] |
|
with open(args.errorfilename, "w") as fp: |
|
json.dump(failures, fp, indent=2, cls=ByteJSONEncoder) |
|
|
|
print( |
|
"[*] Errors (TLSH computation) stored in '{}'".format( |
|
args.errorfilename |
|
) |
|
) |
|
|
|
del failures |
|
|
|
if args.duplicatefilename: |
|
# store duplicates as digest -> [entry1..N] |
|
with open(args.duplicatefilename, "w") as fp: |
|
json.dump(duplicates, fp, indent=2, cls=ByteJSONEncoder) |
|
|
|
print("[*] Duplicates stored in '{}'".format(args.duplicatefilename)) |
|
|
|
del duplicates |
|
|
|
# ------------------------------------------ |
|
|
|
if args.action == "maxdist": |
|
print("[*] Find near duplicates with max_dist={} ...".format(args.max_dist)) |
|
|
|
below_max_dist_idxs, below_max_dist_vals = find_near_duplicates( |
|
tlsh_hashers, max_dist=args.max_dist |
|
) |
|
|
|
print("[*] Number of near duplicates: {}".format(len(below_max_dist_vals))) |
|
if len(below_max_dist_vals): |
|
vals, cnts = np.unique(below_max_dist_vals, return_counts=True) |
|
print( |
|
"[*] Near duplicates: {}".format( |
|
", ".join("[{}]: {}".format(int(v), c) for v, c in zip(vals, cnts)) |
|
) |
|
) |
|
|
|
# store near duplicates as D -> ((digestA, digestB), (entryA, entryB)) |
|
result = dict() |
|
for val in np.unique(below_max_dist_vals): |
|
result[int(val)] = list() |
|
|
|
for (row, col), val in zip(below_max_dist_idxs, below_max_dist_vals): |
|
val = int(val) |
|
if val not in result: |
|
result[val] = list() |
|
drow = tlsh_hashers[row].hexdigest() |
|
dcol = tlsh_hashers[col].hexdigest() |
|
erow = tlsh_lookup[drow] # file |
|
ecol = tlsh_lookup[dcol] |
|
result[val].append(((drow, dcol), (erow, ecol))) |
|
|
|
with open(args.resultfilename, "w") as fp: |
|
json.dump(result, fp, indent=2, cls=ByteJSONEncoder) |
|
|
|
print("[*] Results stored in '{}'".format(args.resultfilename)) |
|
|
|
# ------------------------------------------ |
|
|
|
elif args.action == "matrix": |
|
print("[*] Compute complete near document duplicate matrix ...") |
|
|
|
matrix_iter = compute_near_duplicates_matrix_iter(tlsh_hashers) |
|
|
|
with open(args.resultfilename, "w") as fp: |
|
for chunks in tqdm(matrix_iter, desc="Compute dist matrix"): |
|
np.savetxt(fp, chunks, fmt="%u") |
|
|
|
print("[*] Results stored in '{}'".format(args.resultfilename)) |
|
|
|
# ------------------------------------------ |
|
|
|
elif args.action == "mostsimilar": |
|
print( |
|
"[*] Compute most similar near document duplicates with max_dist={} ...".format( |
|
args.max_dist |
|
) |
|
) |
|
|
|
print("[*] Build Vantage Point Tree ...") |
|
tree = VantagePointTree.build(tlsh_hashers) |
|
|
|
with parallel_backend("multiprocessing", n_jobs=8): |
|
results = find_nearest_with_VPT_parallel( |
|
tlsh_hashers, tlsh_lookup, tree, max_dist=args.max_dist |
|
) |
|
|
|
results = {key: results[key] for key in sorted(results.keys())} |
|
|
|
with open(args.resultfilename, "w") as fp: |
|
json.dump(results, fp, indent=2, cls=ByteJSONEncoder) |
|
|
|
print("[*] Results stored in '{}'".format(args.resultfilename)) |