Created
December 11, 2019 00:24
-
-
Save thesubtlety/01bac27462b1168d6ef2a4787fb4eff4 to your computer and use it in GitHub Desktop.
Submit IPs/CIDRs to Shodan for scanning and download results
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import time | |
import shodan | |
import netaddr | |
import ipaddress | |
''' | |
0. pip install shodan, ipaddress, netaddr | |
1. export SHODAN_API_KEY=nnnnnnnnnnnnnnnn | |
2. python3 bulkip-shodan-scanner.py <ip/cidr file> | |
2.1 check the status of current scans: python3 bulkip-shodan-scanner.py -s | |
Download Shodan data from a list of CIDRs or IP addresses. This script basically | |
batches the IPs in API requests of 100 and downloads using the CLI. | |
Scan statuses of DONE only mean the scan has been picked up by Shodan, not | |
necessarily that it's completed. Use the Shodan data stream if you want to | |
be notified immediately, or wait 24 hours or so for scans to process and redownload. | |
''' | |
API_KEY = os.environ['SHODAN_API_KEY'] | |
api = shodan.Shodan(API_KEY) | |
def print_status_processing(scanp): | |
for scan in scanp['matches']: | |
if scan['status'] == "PROCESSING": | |
print("{}\t{}\t{}".format(scan['id'],scan['status'],scan['created'])) | |
def get_all_processing_scans(): | |
scans = api.scans() | |
pages = scans['total'] // 100 + (scans['total'] % 100 > 0) | |
print("ID\t\t\tSTATUS\t\tCREATED") | |
for p in range(pages): | |
scanp = api.scans(page=p) | |
print_status_processing(scanp) | |
time.sleep(1) | |
def main(): | |
if len(sys.argv) == 2 and sys.argv[1] == "-s": | |
get_all_processing_scans() | |
exit(1) | |
if len(sys.argv) < 2: | |
print("Usage: {} <IP/CIDR list file>".format(sys.argv[0])) | |
exit(1) | |
with open(sys.argv[1],'r') as f: | |
ip_list = f.readlines() | |
ip_array = [] | |
for e in ip_list: | |
if "/" not in e: | |
e += "/32" | |
for host in ipaddress.ip_network(e.strip(), False): | |
ip_array.append(host.compressed) | |
ip_array = list(set(ip_array)) | |
print("Scanning {} IPs...".format(len(ip_array))) | |
# sanity check | |
api_info = api.info() | |
scan_credits = api_info['scan_credits'] | |
if scan_credits < len(ip_array): | |
print("You're trying to scan {} IP addresses but only have {} scan credits. Exiting...".format(len(ip_array),scan_credits)) | |
exit(1) | |
# scan arrays must be split into 100 IPs per request | |
for scan_array in [ip_array[i*100:(i+1)*100] for i in range((len(ip_array) + 100-1) // 100)]: | |
try: | |
print("\nScan request for {}".format(scan_array)) | |
scan = api.scan(scan_array) | |
print(scan) | |
total_time = 0 | |
while True: | |
time.sleep(1) | |
scan_status = api.scan_status(scan['id'])['status'] | |
if scan_status == "DONE": | |
print("Downloading: shodan download --limit -1 {}-results scan:{}".format(scan['id'], scan['id'])) | |
os.system("shodan download --limit -1 {}-results scan:{}".format(scan['id'], scan['id'])) | |
#os.system("shodan convert {}-results.json.gz csv".format(scan['id'])) | |
break | |
else: | |
# one request/second max - sleep a few, status report every 5 minutes | |
if total_time == 0: | |
print("Scan not completed, sleeping...") | |
elif total_time % 300 == 0: | |
print("Scan still not completed...") | |
total_time += 3 | |
time.sleep(3) | |
except Exception as e: | |
print('[x] Error: %s' % e) | |
print("\tProbably didn't download: shodan download --limit -1 {}-results scan:{}".format(scan['id'], scan['id'])) | |
time.sleep(5) # hack for rate limiting | |
print("[!] Completed") | |
print("\tUse shodan parse <file> to view your results") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey 👋
Thanks for this
I run into an issue trying this
My list looks like
Can you tell what's the issue is ?
cheers