Skip to content

Instantly share code, notes, and snippets.

@dtrizna
Last active December 8, 2022 03:05
Show Gist options
  • Save dtrizna/1d087913d54536fc6319a5a414da0d90 to your computer and use it in GitHub Desktop.
Save dtrizna/1d087913d54536fc6319a5a414da0d90 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
from os import path
import sqlite3
import time
import json
# ===================
# DATABASE OPERATIONS
# ===================
def sqlite_instantiate(dbconfig, table="hashes"):
dbname = dbconfig["path"]
# if DB file doesn't exist - create it
if not path.isfile(dbname):
con = sqlite3.connect(dbname)
cur = con.cursor()
cur.execute(f"""CREATE TABLE {table} (hash text, last_seen text)""")
con.commit()
else: # if DB file exists - clean old hashes
nr = sqlite_cleanup(dbname, days=dbconfig['store_days'])
return True
except sqlite3.OperationalError as ex:
logging.error(f"[-] Unexpected behavior during sqlite3 operations: {ex}")
return False
def sqlite_cleanup(dbname, days=90, table="hashes"):
con = sqlite3.connect(dbname)
cur = con.cursor()
now = datetime.now()
cur.execute(f"""DELETE FROM {table} WHERE last_seen<='{(now - timedelta(days=days)).strftime("%Y-%m-%d")}'""")
con.commit()
def check_hashlist(dbname, h_list, table="hashes"):
con = sqlite3.connect(dbname)
cur = con.cursor()
h_list_filtered = []
for h in h_list:
cur.execute(f"SELECT EXISTS(SELECT 1 FROM {table} WHERE hash='{h}')")
# returns (1,) if hash exists in DB and (0,) if doesn't
if cur.fetchone()[0] == 0:
hash_list_filtered.append(h)
return h_list_filtered
def put_hash_in_db(dbname, h, table="hashes"):
con = sqlite3.connect(dbname)
cur = con.cursor()
now = datetime.now()
cur.execute(f"""INSERT INTO {table} VALUES (?, ?)""", [h, f'{now.strftime("%Y-%m-%d")}'])
con.commit()
# ==============
# VIRUSTOTAL API
# ==============
def virustotal_analysis(h):
response = requests.get(
f"https://virustotal.com/api/v3/files/{h}",
headers={"X-APIKEY": VTAPI_KEY})
return response
# ============================
# DEFINE - DEPLOYMENT SPECIFIC
# ============================
DB_PATH = "/opt/zipf/hash.db"
THRESHOLD = 10
VTAPI_KEY = open("mykey.api").read()
QUERY_WINDOW = 60*60*24 # 1d
def get_data(QUERY_WINDOW):
# ElasticSearch example defined in https://gist.github.com/dtrizna/b027c7ffbba48b1574f0f00e69604ed3
# expected format: [(value1, count1), (value2, count2), ...]
raise NotImplementedError
def report(h):
# send event about malicious hash 'h' to SIEM / SOAR, message via mail / slack / etc.
raise NotImplementedError
# =========
# MAIN LOOP
# =========
if __name == "__main__":
sqlite_instantiate(DB_PATH)
while True:
out_of_baseline = [h for h, count in get_data() if count < THRESHOLD]
out_of_baseline_new = check_hashlist(DB_PATH, out_of_baseline)
for h in out_of_baseline_new:
vt_results = virustotal_analysis(h)
if vt_results.status_code == 200:
vt_results_json = json.loads(vt_results.text)
try:
stats = vt_results_json["data"]["attributes"]["last_analysis_stats"]
mal_score = stats["malicious"]
if mal_score >= 1: # at least one vendor marks hash as malicious
report(h)
except KeyError: # hash probably missing in VT
continue
put_hash_in_db(DB_PATH, h)
sqlite_cleanup(DB_PATH, days=90)
time.sleep(QUERY_WINDOW)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment