Last active
August 6, 2019 20:45
-
-
Save 9b/b5a6497182babbe8fb4a0658e54aca8d to your computer and use it in GitHub Desktop.
Small script to request WHOIS information from RiskIQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Modify the script to include your username and API key. | |
Create a virtualenv to keep your space clean: | |
$ virtualenv -p python3 venv3 | |
Activate it: | |
$ source venv3/bin/activate | |
Install the reqs: | |
$(venv3) pip install -r requirements.txt | |
Perform a single query: | |
$(venv3) python riskiq-whois.py search -q riskiq.com -o foo.csv | |
Peform a bunch of queries from file: | |
$(venv3) python riskiq-whois.py search -f domains -o foo.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
import csv | |
username = '' | |
key = '' | |
auth = (username, key) | |
base_url = 'https://api.passivetotal.org' | |
header_added = False | |
def passivetotal_get(path, query): | |
url = base_url + path | |
data = {'query': query} | |
response = requests.get(url, auth=auth, json=data) | |
return response.json() | |
with open('domains-whois.csv','w') as oF: | |
with open('domains.txt','r') as f: | |
for line in f: | |
b="" | |
print "working on domain %s" %(line.rstrip()) | |
pwhois = passivetotal_get('/v2/whois', line.rstrip()) | |
try: | |
b = str(pwhois["admin"])+"\t"+str(pwhois["billing"])+"\t"+str(pwhois["contactEmail"])+"\t"+str(pwhois["domain"])+"\t"+str(pwhois["expiresAt"])+"\t"+str(pwhois["lastLoadedAt"])+"\t"+str(pwhois["name"])+"\t"+str(pwhois["nameServers"])+"\t"+str(pwhois["organization"])+"\t"+str(pwhois["registered"])+"\t"+str(pwhois["registrant"])+"\t"+str(pwhois["registrar"])+"\t"+str(pwhois["registryUpdatedAt"])+"\t"+str(pwhois["tech"])+"\t"+str(pwhois["telephone"])+"\t"+str(pwhois["whoisServer"])+"\t"+str(pwhois["zone"])+"\n" | |
except: | |
for k,v in sorted(pwhois.items()): | |
if k not in 'text': | |
b = b+str(v)+"\t" | |
b = b+"\n" | |
else: | |
print(b) | |
oF.write(b) | |
print "DONE!\n" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
certifi==2019.3.9 | |
chardet==3.0.4 | |
idna==2.8 | |
requests==2.22.0 | |
requests-futures==0.9.9 | |
urllib3==1.25.3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Handle getting WHOIS information from RiskIQ.""" | |
import json | |
import logging | |
import sys | |
from argparse import ArgumentParser | |
from concurrent.futures import wait | |
from requests_futures.sessions import FuturesSession | |
USERNAME = "" | |
API_KEY = "" | |
AUTH = (USERNAME, API_KEY) | |
BASE_URL = "https://api.passivetotal.org" | |
WHOIS_URL = "%s/v2/whois" % (BASE_URL) | |
LOG_LEVEL = logging.DEBUG | |
OUTPUT_HEADERS = [ | |
'Admin', | |
'Billing', | |
'Contact Email', | |
'Domain', | |
'Expires', | |
'Last Loaded', | |
'Name', | |
'Name Servers', | |
'Organization', | |
'Registered', | |
'Telephone', | |
'WHOIS Server', | |
'Zone' | |
] | |
def gen_logger(name, log_level=logging.INFO): | |
"""Create a logger to be used between processes.""" | |
logger = logging.getLogger(name) | |
logger.setLevel(log_level) | |
shandler = logging.StreamHandler = logging.StreamHandler(sys.stdout) | |
fmt = '\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():' | |
fmt += '%(lineno)d %(asctime)s\033[0m| %(message)s' | |
shandler.setFormatter(logging.Formatter(fmt)) | |
logger.addHandler(shandler) | |
return logger | |
LOG = gen_logger("RiskIQ-WHOIS", LOG_LEVEL) # Gross location, but cleaner than print | |
def bulk_request(queries): | |
"""Prepare the requests to be launched into Internet space.""" | |
LOG.debug("[#] Preparing the queries to run.") | |
queries = list(set(queries)) # Ensure we have no dupes | |
session = FuturesSession(max_workers=len(queries)) # Not the smartest calc | |
futures = [ | |
session.get(WHOIS_URL, json={'query': query}, auth=AUTH, timeout=10) | |
for query in queries | |
] | |
done, _ = wait(futures) | |
results = list() | |
LOG.info("[*] Processing request results.") | |
for response in done: | |
try: | |
result = response.result() # Give me the future | |
if result.status_code not in [200]: | |
raise Exception("Response code was not valid.") | |
result = json.loads(result.content) | |
results.append(result) | |
except Exception as err: | |
LOG.warn("[!] Failed result: %s" % err) | |
return results | |
def format_output_pt(responses): | |
"""Format the PassiveTotal query results for CSV output.""" | |
formatted = list() | |
for response in responses: | |
record = [ | |
response.get('admin', None), | |
response.get('billing', None), | |
response.get('contactEmail', None), | |
response.get('domain', None), | |
response.get('expiresAt', None), | |
response.get('lastLoadedAt', None), | |
response.get('name', None), | |
response.get('nameServers', None), | |
response.get('organization', None), | |
response.get('registered', None), | |
response.get('telephone', None), | |
response.get('whoisServer', None), | |
response.get('zone', None), | |
] | |
record = [str(x) for x in record] | |
tabbed = '\t'.join(record) | |
formatted.append(tabbed) | |
return formatted | |
def main(): | |
"""Be Human. Do human things.""" | |
parser = ArgumentParser() | |
subs = parser.add_subparsers(dest='cmd') | |
setup_parser = subs.add_parser('search') | |
setup_parser.add_argument('-q', '--query', dest='query', required=False, | |
help='Single query to run.') | |
setup_parser.add_argument('-f', '--file', dest='file', required=False, | |
help='File of queries to process.', type=str) | |
setup_parser.add_argument('-o', '--out', dest='out', required=False, | |
help='File to write results out.', type=str) | |
args = parser.parse_args() | |
if args.cmd == 'search': | |
if args.query: | |
results = bulk_request([args.query]) | |
if args.file: | |
try: | |
items = [x.strip() for x in open(args.file).readlines()] | |
except Exception as err: | |
LOG.error("[X] Failed to process query file.") | |
else: | |
results = bulk_request(items) | |
if args.out and results: | |
handle = open(args.out, 'w') | |
file_headers = '\t'.join(OUTPUT_HEADERS) | |
handle.write(file_headers + '\n') | |
writable_results = format_output_pt(results) | |
for result in writable_results: | |
handle.write(result + "\n") | |
handle.close() | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment