Last active
December 8, 2022 03:05
-
-
Save dtrizna/1d087913d54536fc6319a5a414da0d90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from os import path | |
import sqlite3 | |
import time | |
import json | |
# =================== | |
# DATABASE OPERATIONS | |
# =================== | |
def sqlite_instantiate(dbconfig, table="hashes"): | |
dbname = dbconfig["path"] | |
# if DB file doesn't exist - create it | |
if not path.isfile(dbname): | |
con = sqlite3.connect(dbname) | |
cur = con.cursor() | |
cur.execute(f"""CREATE TABLE {table} (hash text, last_seen text)""") | |
con.commit() | |
else: # if DB file exists - clean old hashes | |
nr = sqlite_cleanup(dbname, days=dbconfig['store_days']) | |
return True | |
except sqlite3.OperationalError as ex: | |
logging.error(f"[-] Unexpected behavior during sqlite3 operations: {ex}") | |
return False | |
def sqlite_cleanup(dbname, days=90, table="hashes"): | |
con = sqlite3.connect(dbname) | |
cur = con.cursor() | |
now = datetime.now() | |
cur.execute(f"""DELETE FROM {table} WHERE last_seen<='{(now - timedelta(days=days)).strftime("%Y-%m-%d")}'""") | |
con.commit() | |
def check_hashlist(dbname, h_list, table="hashes"): | |
con = sqlite3.connect(dbname) | |
cur = con.cursor() | |
h_list_filtered = [] | |
for h in h_list: | |
cur.execute(f"SELECT EXISTS(SELECT 1 FROM {table} WHERE hash='{h}')") | |
# returns (1,) if hash exists in DB and (0,) if doesn't | |
if cur.fetchone()[0] == 0: | |
hash_list_filtered.append(h) | |
return h_list_filtered | |
def put_hash_in_db(dbname, h, table="hashes"): | |
con = sqlite3.connect(dbname) | |
cur = con.cursor() | |
now = datetime.now() | |
cur.execute(f"""INSERT INTO {table} VALUES (?, ?)""", [h, f'{now.strftime("%Y-%m-%d")}']) | |
con.commit() | |
# ============== | |
# VIRUSTOTAL API | |
# ============== | |
def virustotal_analysis(h): | |
response = requests.get( | |
f"https://virustotal.com/api/v3/files/{h}", | |
headers={"X-APIKEY": VTAPI_KEY}) | |
return response | |
# ============================ | |
# DEFINE - DEPLOYMENT SPECIFIC | |
# ============================ | |
DB_PATH = "/opt/zipf/hash.db" | |
THRESHOLD = 10 | |
VTAPI_KEY = open("mykey.api").read() | |
QUERY_WINDOW = 60*60*24 # 1d | |
def get_data(QUERY_WINDOW): | |
# ElasticSearch example defined in https://gist.github.com/dtrizna/b027c7ffbba48b1574f0f00e69604ed3 | |
# expected format: [(value1, count1), (value2, count2), ...] | |
raise NotImplementedError | |
def report(h): | |
# send event about malicious hash 'h' to SIEM / SOAR, message via mail / slack / etc. | |
raise NotImplementedError | |
# ========= | |
# MAIN LOOP | |
# ========= | |
if __name == "__main__": | |
sqlite_instantiate(DB_PATH) | |
while True: | |
out_of_baseline = [h for h, count in get_data() if count < THRESHOLD] | |
out_of_baseline_new = check_hashlist(DB_PATH, out_of_baseline) | |
for h in out_of_baseline_new: | |
vt_results = virustotal_analysis(h) | |
if vt_results.status_code == 200: | |
vt_results_json = json.loads(vt_results.text) | |
try: | |
stats = vt_results_json["data"]["attributes"]["last_analysis_stats"] | |
mal_score = stats["malicious"] | |
if mal_score >= 1: # at least one vendor marks hash as malicious | |
report(h) | |
except KeyError: # hash probably missing in VT | |
continue | |
put_hash_in_db(DB_PATH, h) | |
sqlite_cleanup(DB_PATH, days=90) | |
time.sleep(QUERY_WINDOW) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment