Skip to content

Instantly share code, notes, and snippets.

@pedramamini
Last active July 31, 2021 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pedramamini/ebc2ca356b34f0bc12876d7a329847de to your computer and use it in GitHub Desktop.
Save pedramamini/ebc2ca356b34f0bc12876d7a329847de to your computer and use it in GitHub Desktop.
InQuest Labs Daily Sample Harvest
#!/bin/env python
"""
This script is intended to be run via a daily cron job and will retrieve up to MAX_DAILY interesting samples with
relevant JSON metadata containing extruded layers, IOCs, etc.
This script requires the installation of:
https://github.com/inquest/python-inquestlabs
Here's an exemplar:
https://inquest.net/blog/2020/07/27/Tale-of-a-Polished-Carrier
"""
API_KEY = ""
MAX_DAILY = 5
MAX_ATTRIBUTE = 10
SAMPLE_PATH = "./labs.inquest.net"
IGNORED_TLDS = [".name", ".sc", ".st", ".su", ".to", ".fm"]
### SHOULD NOT NEED TO MODIFY BELOW THIS LINE
import os
import json
import socket
import inquestlabs
########################################################################################################################
def is_valid_domain (domain):
"""
Return True if domain resolves, False otherwise.
"""
try:
if socket.gethostbyname(domain):
return True
return False
except:
return False
########################################################################################################################
def is_valid_ip (ip):
"""
Return True if IP looks legit, False otherwise.
"""
try:
socket.inet_aton(ip)
return True
except:
return False
########################################################################################################################
if __name__ == "__main__":
iq = inquestlabs.inquestlabs_api(API_KEY)
very_malicious = []
potentially_interesting = []
print("enumerating malicious samples...")
for record in iq.dfi_list(malicious=True, has_code=True):
if record['vt_positives'] > 20:
very_malicious.append(record)
elif record['vt_positives'] <= 3:
potentially_interesting.append(record)
print("found %d very malicious and %d potentially interesting" % (len(very_malicious), len(potentially_interesting)))
print("harvesting the first %d potentially interesting samples to %s" % (MAX_DAILY, SAMPLE_PATH))
for record in potentially_interesting[:MAX_DAILY]:
sha256 = record['sha256']
path = os.path.join(SAMPLE_PATH, sha256)
if os.path.exists(path + ".vx"):
print("skipping %s, previously harvested." % sha256)
continue
print("harvesting %s" % sha256)
# download sample.
print("downloading sample...", end="")
iq.dfi_download(sha256, path + ".vx")
print("done.")
# retrieve sample details.
print("retrieving details...", end="")
details = iq.dfi_details(sha256, attributes=True)
with open(path + ".json", "w+") as fh:
fh.write(json.dumps(details))
print("done.")
# check domains and IPS against REP/IOC databases.
num_attributes = 0
all_rep_hits = []
all_ioc_hits = []
for a in details['attributes']:
kind = a['attribute']
ioc = a['value'].lower()
skip = False
if kind == "domain":
for tld in IGNORED_TLDS:
if ioc.endswith(tld):
skip = True
break
if skip:
print("skipping over IOC: %s" % ioc)
continue
# we've exhausted the maximum
if num_attributes >= MAX_ATTRIBUTE:
print("exhausted max attribute count of %s, moving on to next sample." % MAX_ATTRIBUTE)
break
# look for seemingly valid IPs or domains...
if (kind == "domain" and is_valid_domain(ioc)) or (kind == "ip" and is_valid_ip(ioc)):
num_attributes += 1
print("checking %s against repdb..." % ioc)
try:
rep_hits = iq.repdb_search(ioc)
if rep_hits:
all_rep_hits.append(rep_hits)
except Exception as e:
print("error querying REPDB for %s/%s:\n%s" % (sha256, ioc, str(e)))
try:
ioc_hits = iq.iocdb_search(ioc)
if ioc_hits:
all_ioc_hits.append(ioc_hits)
except Exception as e:
print("error querying IOCDB for %s/%s:\n%s" % (sha256, ioc, str(e)))
# write JSON for collated REPDB hits.
if all_rep_hits:
with open(path + ".repdb.json", "w+") as fh:
fh.write(json.dumps(all_rep_hits))
# write JSON for collated IOCDB hits.
if all_ioc_hits:
with open(path + ".iocdb.json", "w+") as fh:
fh.write(json.dumps(all_ioc_hits))
print("completed harvesting %s" % sha256)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment