Skip to content

Instantly share code, notes, and snippets.

@9b 9b/README
Last active Aug 6, 2019

Embed
What would you like to do?
Small script to request WHOIS information from RiskIQ
Modify the script to include your username and API key.
Create a virtualenv to keep your space clean:
$ virtualenv -p python3 venv3
Activate it:
$ source venv3/bin/activate
Install the reqs:
$(venv3) pip install -r requirements.txt
Perform a single query:
$(venv3) python riskiq-whois.py search -q riskiq.com -o foo.csv
Peform a bunch of queries from file:
$(venv3) python riskiq-whois.py search -f domains -o foo.csv
import requests
import json
import time
import csv
username = ''
key = ''
auth = (username, key)
base_url = 'https://api.passivetotal.org'
header_added = False
def passivetotal_get(path, query):
url = base_url + path
data = {'query': query}
response = requests.get(url, auth=auth, json=data)
return response.json()
with open('domains-whois.csv','w') as oF:
with open('domains.txt','r') as f:
for line in f:
b=""
print "working on domain %s" %(line.rstrip())
pwhois = passivetotal_get('/v2/whois', line.rstrip())
try:
b = str(pwhois["admin"])+"\t"+str(pwhois["billing"])+"\t"+str(pwhois["contactEmail"])+"\t"+str(pwhois["domain"])+"\t"+str(pwhois["expiresAt"])+"\t"+str(pwhois["lastLoadedAt"])+"\t"+str(pwhois["name"])+"\t"+str(pwhois["nameServers"])+"\t"+str(pwhois["organization"])+"\t"+str(pwhois["registered"])+"\t"+str(pwhois["registrant"])+"\t"+str(pwhois["registrar"])+"\t"+str(pwhois["registryUpdatedAt"])+"\t"+str(pwhois["tech"])+"\t"+str(pwhois["telephone"])+"\t"+str(pwhois["whoisServer"])+"\t"+str(pwhois["zone"])+"\n"
except:
for k,v in sorted(pwhois.items()):
if k not in 'text':
b = b+str(v)+"\t"
b = b+"\n"
else:
print(b)
oF.write(b)
print "DONE!\n"
certifi==2019.3.9
chardet==3.0.4
idna==2.8
requests==2.22.0
requests-futures==0.9.9
urllib3==1.25.3
#!/usr/bin/env python
"""Handle getting WHOIS information from RiskIQ."""
import json
import logging
import sys
from argparse import ArgumentParser
from concurrent.futures import wait
from requests_futures.sessions import FuturesSession
USERNAME = ""
API_KEY = ""
AUTH = (USERNAME, API_KEY)
BASE_URL = "https://api.passivetotal.org"
WHOIS_URL = "%s/v2/whois" % (BASE_URL)
LOG_LEVEL = logging.DEBUG
OUTPUT_HEADERS = [
'Admin',
'Billing',
'Contact Email',
'Domain',
'Expires',
'Last Loaded',
'Name',
'Name Servers',
'Organization',
'Registered',
'Telephone',
'WHOIS Server',
'Zone'
]
def gen_logger(name, log_level=logging.INFO):
"""Create a logger to be used between processes."""
logger = logging.getLogger(name)
logger.setLevel(log_level)
shandler = logging.StreamHandler = logging.StreamHandler(sys.stdout)
fmt = '\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():'
fmt += '%(lineno)d %(asctime)s\033[0m| %(message)s'
shandler.setFormatter(logging.Formatter(fmt))
logger.addHandler(shandler)
return logger
LOG = gen_logger("RiskIQ-WHOIS", LOG_LEVEL) # Gross location, but cleaner than print
def bulk_request(queries):
"""Prepare the requests to be launched into Internet space."""
LOG.debug("[#] Preparing the queries to run.")
queries = list(set(queries)) # Ensure we have no dupes
session = FuturesSession(max_workers=len(queries)) # Not the smartest calc
futures = [
session.get(WHOIS_URL, json={'query': query}, auth=AUTH, timeout=10)
for query in queries
]
done, _ = wait(futures)
results = list()
LOG.info("[*] Processing request results.")
for response in done:
try:
result = response.result() # Give me the future
if result.status_code not in [200]:
raise Exception("Response code was not valid.")
result = json.loads(result.content)
results.append(result)
except Exception as err:
LOG.warn("[!] Failed result: %s" % err)
return results
def format_output_pt(responses):
"""Format the PassiveTotal query results for CSV output."""
formatted = list()
for response in responses:
record = [
response.get('admin', None),
response.get('billing', None),
response.get('contactEmail', None),
response.get('domain', None),
response.get('expiresAt', None),
response.get('lastLoadedAt', None),
response.get('name', None),
response.get('nameServers', None),
response.get('organization', None),
response.get('registered', None),
response.get('telephone', None),
response.get('whoisServer', None),
response.get('zone', None),
]
record = [str(x) for x in record]
tabbed = '\t'.join(record)
formatted.append(tabbed)
return formatted
def main():
"""Be Human. Do human things."""
parser = ArgumentParser()
subs = parser.add_subparsers(dest='cmd')
setup_parser = subs.add_parser('search')
setup_parser.add_argument('-q', '--query', dest='query', required=False,
help='Single query to run.')
setup_parser.add_argument('-f', '--file', dest='file', required=False,
help='File of queries to process.', type=str)
setup_parser.add_argument('-o', '--out', dest='out', required=False,
help='File to write results out.', type=str)
args = parser.parse_args()
if args.cmd == 'search':
if args.query:
results = bulk_request([args.query])
if args.file:
try:
items = [x.strip() for x in open(args.file).readlines()]
except Exception as err:
LOG.error("[X] Failed to process query file.")
else:
results = bulk_request(items)
if args.out and results:
handle = open(args.out, 'w')
file_headers = '\t'.join(OUTPUT_HEADERS)
handle.write(file_headers + '\n')
writable_results = format_output_pt(results)
for result in writable_results:
handle.write(result + "\n")
handle.close()
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.