Skip to content

Instantly share code, notes, and snippets.

@9b
Last active August 6, 2019 20:45
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 9b/b5a6497182babbe8fb4a0658e54aca8d to your computer and use it in GitHub Desktop.
Save 9b/b5a6497182babbe8fb4a0658e54aca8d to your computer and use it in GitHub Desktop.
Small script to request WHOIS information from RiskIQ
Modify the script to include your username and API key.
Create a virtualenv to keep your space clean:
$ virtualenv -p python3 venv3
Activate it:
$ source venv3/bin/activate
Install the reqs:
$(venv3) pip install -r requirements.txt
Perform a single query:
$(venv3) python riskiq-whois.py search -q riskiq.com -o foo.csv
Peform a bunch of queries from file:
$(venv3) python riskiq-whois.py search -f domains -o foo.csv
import requests
import json
import time
import csv
username = ''
key = ''
auth = (username, key)
base_url = 'https://api.passivetotal.org'
header_added = False
def passivetotal_get(path, query):
url = base_url + path
data = {'query': query}
response = requests.get(url, auth=auth, json=data)
return response.json()
with open('domains-whois.csv','w') as oF:
with open('domains.txt','r') as f:
for line in f:
b=""
print "working on domain %s" %(line.rstrip())
pwhois = passivetotal_get('/v2/whois', line.rstrip())
try:
b = str(pwhois["admin"])+"\t"+str(pwhois["billing"])+"\t"+str(pwhois["contactEmail"])+"\t"+str(pwhois["domain"])+"\t"+str(pwhois["expiresAt"])+"\t"+str(pwhois["lastLoadedAt"])+"\t"+str(pwhois["name"])+"\t"+str(pwhois["nameServers"])+"\t"+str(pwhois["organization"])+"\t"+str(pwhois["registered"])+"\t"+str(pwhois["registrant"])+"\t"+str(pwhois["registrar"])+"\t"+str(pwhois["registryUpdatedAt"])+"\t"+str(pwhois["tech"])+"\t"+str(pwhois["telephone"])+"\t"+str(pwhois["whoisServer"])+"\t"+str(pwhois["zone"])+"\n"
except:
for k,v in sorted(pwhois.items()):
if k not in 'text':
b = b+str(v)+"\t"
b = b+"\n"
else:
print(b)
oF.write(b)
print "DONE!\n"
certifi==2019.3.9
chardet==3.0.4
idna==2.8
requests==2.22.0
requests-futures==0.9.9
urllib3==1.25.3
#!/usr/bin/env python
"""Handle getting WHOIS information from RiskIQ."""
import json
import logging
import sys
from argparse import ArgumentParser
from concurrent.futures import wait
from requests_futures.sessions import FuturesSession
USERNAME = ""
API_KEY = ""
AUTH = (USERNAME, API_KEY)
BASE_URL = "https://api.passivetotal.org"
WHOIS_URL = "%s/v2/whois" % (BASE_URL)
LOG_LEVEL = logging.DEBUG
OUTPUT_HEADERS = [
'Admin',
'Billing',
'Contact Email',
'Domain',
'Expires',
'Last Loaded',
'Name',
'Name Servers',
'Organization',
'Registered',
'Telephone',
'WHOIS Server',
'Zone'
]
def gen_logger(name, log_level=logging.INFO):
"""Create a logger to be used between processes."""
logger = logging.getLogger(name)
logger.setLevel(log_level)
shandler = logging.StreamHandler = logging.StreamHandler(sys.stdout)
fmt = '\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():'
fmt += '%(lineno)d %(asctime)s\033[0m| %(message)s'
shandler.setFormatter(logging.Formatter(fmt))
logger.addHandler(shandler)
return logger
LOG = gen_logger("RiskIQ-WHOIS", LOG_LEVEL) # Gross location, but cleaner than print
def bulk_request(queries):
"""Prepare the requests to be launched into Internet space."""
LOG.debug("[#] Preparing the queries to run.")
queries = list(set(queries)) # Ensure we have no dupes
session = FuturesSession(max_workers=len(queries)) # Not the smartest calc
futures = [
session.get(WHOIS_URL, json={'query': query}, auth=AUTH, timeout=10)
for query in queries
]
done, _ = wait(futures)
results = list()
LOG.info("[*] Processing request results.")
for response in done:
try:
result = response.result() # Give me the future
if result.status_code not in [200]:
raise Exception("Response code was not valid.")
result = json.loads(result.content)
results.append(result)
except Exception as err:
LOG.warn("[!] Failed result: %s" % err)
return results
def format_output_pt(responses):
"""Format the PassiveTotal query results for CSV output."""
formatted = list()
for response in responses:
record = [
response.get('admin', None),
response.get('billing', None),
response.get('contactEmail', None),
response.get('domain', None),
response.get('expiresAt', None),
response.get('lastLoadedAt', None),
response.get('name', None),
response.get('nameServers', None),
response.get('organization', None),
response.get('registered', None),
response.get('telephone', None),
response.get('whoisServer', None),
response.get('zone', None),
]
record = [str(x) for x in record]
tabbed = '\t'.join(record)
formatted.append(tabbed)
return formatted
def main():
"""Be Human. Do human things."""
parser = ArgumentParser()
subs = parser.add_subparsers(dest='cmd')
setup_parser = subs.add_parser('search')
setup_parser.add_argument('-q', '--query', dest='query', required=False,
help='Single query to run.')
setup_parser.add_argument('-f', '--file', dest='file', required=False,
help='File of queries to process.', type=str)
setup_parser.add_argument('-o', '--out', dest='out', required=False,
help='File to write results out.', type=str)
args = parser.parse_args()
if args.cmd == 'search':
if args.query:
results = bulk_request([args.query])
if args.file:
try:
items = [x.strip() for x in open(args.file).readlines()]
except Exception as err:
LOG.error("[X] Failed to process query file.")
else:
results = bulk_request(items)
if args.out and results:
handle = open(args.out, 'w')
file_headers = '\t'.join(OUTPUT_HEADERS)
handle.write(file_headers + '\n')
writable_results = format_output_pt(results)
for result in writable_results:
handle.write(result + "\n")
handle.close()
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment