Created
November 1, 2020 19:12
-
-
Save gdassori/1070d67fc8fde7e8688702ebca4402b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import os | |
import sys | |
import json | |
import uuid | |
import argparse | |
import logging | |
import coloredlogs | |
import requests | |
# Defining the default values that can be overridden on the CLI | |
DEFAULTS = { | |
'guidfile': 'client-guid', | |
'outfile': 'last-dump', | |
'verfile': 'last-version', | |
'instance': 'Worldwide' | |
} | |
def main(options): | |
# Lets make do stuff. See init at the bottom for the 'options' logic | |
logging.info('Starting') | |
if options.force: | |
if options.deltafile: | |
if os.path.isfile(options.deltafile): | |
os.remove(options.deltafile) | |
if os.path.isfile(options.verfile): | |
os.remove(options.verfile) | |
if os.path.isfile(options.outfile): | |
os.remove(options.outfile) | |
# If we are doing a delta, wipe any previous delta file | |
if options.deltafile is not None: | |
write_json_file(options.deltafile, {}) | |
# If there is no update we are done, unless forced | |
(new_version, previous_version) = get_versions(options.version_url, | |
options.verfile) | |
if new_version == previous_version: | |
logging.info('Version matches previous. No update') | |
sys.exit(0) | |
# Download and process the latest IPs | |
ip_struct = get_ip_addresses(options.data_url, options.optional) | |
# Calcualte delta if we are asked to do so | |
if options.deltafile is not None: | |
generate_delta(ip_struct, options.outfile, options.deltafile) | |
logging.info('Delta File: {}'.format(options.deltafile)) | |
# Dump the latest results to disk | |
write_json_file(options.outfile, ip_struct, True) | |
commit_processed_version(options.verfile, new_version) | |
logging.info('Output File: {}'.format(options.outfile)) | |
logging.info('Complete!') | |
def write_json_file(filename, data, pretty=False): | |
# Dump a python data structure to JSON FILE | |
logging.debug('Writing JSON File : {}'.format(filename)) | |
with open(filename, 'w') as file_handle: | |
if pretty: | |
json.dump(data, file_handle, indent=2) | |
else: | |
json.dump(data, file_handle) | |
def get_versions(url, filename): | |
# Here we want to determinge if there is a new version to process or not | |
previous_version = "42" | |
logging.debug('Downloading Version Information') | |
current_version = get_version_info(url) | |
# If we've run before, read in the version last processed | |
if os.path.isfile(filename): | |
previous_version = read_single_state(filename) | |
if current_version == previous_version: | |
logging.debug('No version change: {}'.format(current_version)) | |
else: | |
logging.debug('New version discovered: {}'.format(current_version)) | |
return (current_version, previous_version) | |
def commit_processed_version(filename, version): | |
# Write out the version we have finished processing | |
logging.debug('Writing last processed version to: {}'.format(filename)) | |
write_single_state(filename, version) | |
def get_version_info(url): | |
version_info = requests.get(url).json() | |
if 'latest' in version_info: | |
return version_info['latest'] | |
return None | |
def read_single_state(filename): | |
logging.debug('Read state file: {}'.format(filename)) | |
with open(filename, 'r') as file_handle: | |
return file_handle.readline().rstrip() | |
def write_single_state(filename, value): | |
logging.debug('Write state file: {}'.format(filename)) | |
with open(filename, 'w') as file_handle: | |
print(value, file=file_handle) | |
def generate_delta(data, filename, deltafile): | |
logging.debug('Generating Delta') | |
delta = {'add': [], 'remove': []} | |
previous = {} | |
# If there is a previous run, lets load it. | |
if os.path.isfile(filename): | |
with open(filename, 'r') as file_handle: | |
previous = json.load(file_handle) | |
# Find new additions | |
for ip in data: | |
if ip not in previous: | |
delta['add'].append(ip) | |
# Find removals | |
for ip in previous: | |
if ip not in data: | |
delta['remove'].append(ip) | |
# Write out the Delta | |
write_json_file(deltafile, delta, True) | |
def init_deltafile(filename): | |
logging.debug('Initializing Delta File : {}'.format(filename)) | |
if os.path.isfile(filename): | |
with open(filename, 'w') as file_handle: | |
# Empty object in-case there are no changes | |
print('{}', file=file_handle) | |
def get_ip_addresses(url, include_optional): | |
logging.debug('Include optional IPs: {}'.format(include_optional)) | |
# We are going to accumualte IPs in dicts to de-dup | |
ips = {} | |
records = requests.get(url).json() | |
for record in records: | |
if 'ips' in record: | |
for ip in record['ips']: | |
if record['required']: | |
ips[ip] = 42 | |
elif include_optional: | |
ips[ip] = 42 | |
return ips | |
def init(): | |
''' | |
init() | |
Handle command line args, setup log, etc.. | |
''' | |
global DEFAULTS | |
# Configure log | |
coloredlogs.install(level='DEBUG', | |
fmt='%(asctime)s %(levelname)s %(message)s') | |
# Supress requests log | |
logging.getLogger('requests').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
# Handle command line args | |
parser = argparse.ArgumentParser( | |
description='Get Microsoft Office 365 IP lists.') | |
parser.add_argument('-D, --debug', dest='debug', | |
help='Full download output', | |
action='store_true') | |
parser.add_argument('-f, --force', dest='force', | |
help='Download update even if version has not changed', | |
action='store_true') | |
parser.add_argument('-o, --outfile', dest='outfile', | |
help='Full download output', | |
default=DEFAULTS['outfile']) | |
parser.add_argument('-v, --verfile', dest='verfile', | |
help='File to store version infomation', | |
default=DEFAULTS['verfile']) | |
parser.add_argument('-d, --deltafile', dest='deltafile', | |
help='Generate delta to file', | |
default=None) | |
parser.add_argument('-g, --guidfile', dest='guidfile', | |
help='File to load guid from. Will generate if file not found', | |
default=DEFAULTS['guidfile']) | |
parser.add_argument('-i, --instance', dest='instance', | |
help='Microsoft Office 365 Instance', | |
choices=['Worldwide', 'China', 'Germany', | |
'USGovDoD', 'USGovGCCHigh'], | |
default=DEFAULTS['instance']) | |
parser.add_argument('-p, --disable_optional_ips', dest='optional', | |
help="Do not include optional IPs", | |
action='store_false') | |
options = parser.parse_args() | |
# Enable debug | |
if not options.debug: | |
coloredlogs.decrease_verbosity() | |
# Read client guid from file or generate and write to file for | |
# subsequent runs. Not Microsoft asks for a unique UUID per "system" that | |
# accesses the API | |
if os.path.isfile(options.guidfile): | |
options.client_guid = read_single_state(options.guidfile) | |
else: | |
options.client_guid = uuid.uuid4() | |
write_single_state(options.guidfile, options.client_guid) | |
# Build the URLs based on the Instance selection and our guid | |
base_url = 'https://endpoints.office.com' | |
options.version_url = '{}/version/{}/?clientrequestid={}'.format(base_url, options.instance, options.client_guid) | |
options.data_url = '{}/endpoints/{}/?clientrequestid={}'.format(base_url, options.instance, options.client_guid) | |
return options | |
if __name__ == '__main__': | |
main(init()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment