Skip to content

Instantly share code, notes, and snippets.

@umbernhard
Created March 27, 2017 20:20
Show Gist options
  • Save umbernhard/22a1a4a31fc52c3e7409ee804bc4949f to your computer and use it in GitHub Desktop.
Save umbernhard/22a1a4a31fc52c3e7409ee804bc4949f to your computer and use it in GitHub Desktop.
Pulls down compressed Censys historical scans for given protocols over a given period of time.
import requests
import sys
# Protocols to grab. The fully qualified names can be found here: https://censys.io/api/v1/data (feed into jq for readability)
protos = ["21-ftp-banner-full_ipv4",
"22-ssh-banner-full_ipv4",
"23-telnet-banner-full_ipv4",
"80-http-get-full_ipv4",
"443-https-ssl_2-full_ipv4",
"443-https-tls-full_ipv4",
"7547-cwmp-get-full_ipv4"]
start_date = "20160719" # The first day of scans we care about
end_date = "20170228" # the last day of scans we care about
auth=('XXX', 'XXX') # Censys authorization credentials. See the code examples at the bottom of the page here: https://censys.io/api
data_path = "path_to_date_directory" # where should we put our data?
# function taken from https://stackoverflow.com/questions/16694907/how-to-download-large-file-in-python-with-requests-py
def download_file(url, proto):
local_filename = url.split('/')[-1]
# NOTE the stream=True parameter
r = requests.get(url, stream=True, auth=auth)
with open(data_path + '/' + proto + '/' + local_filename, 'wb') as f: # Each protocol gets its own directory
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return local_filename
def main():
# get urls for all of our protocols
r = requests.get('https://censys.io/api/v1/data', auth=auth)
if r.status_code != 200:
print "request to censys data api failed with status code:", str(r.status_code)
sys.exit(1)
raw_series = r.json()["raw_series"]
for proto in protos:
u = requests.get(raw_series[proto]['details_url'], auth=auth)
if u.status_code != 200:
print "request to censys data api at", raw_series[proto]['details_url'], "failed with status code:", str(u.status_code)
sys.exit(1)
# iterate over all the historical scans
for result in u.json()['results']['historical']:
#check for the entries within our date range
ts = result['timestamp'][:8]
if ts >= start_date and ts <= end_date:
f = requests.get(result['details_url'], auth=auth)
if f.status_code != 200:
print "request to censys data api at", results['details_url'], "failed with status code:", str(f.status_code)
sys.exit(1)
# Note that we're getting the compressed files for the sake of bandwitdh saving
download_file(f.json()['files']["zgrab-results"]["compressed_download_path"], u.json()['protocol'])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment