Last active
October 3, 2023 18:23
-
-
Save northwestcoder/8843f84e71250458c73610ac63069fa0 to your computer and use it in GitHub Desktop.
Get all downstream information for all DACs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple Command line to group all downstream Satori Datastores and Datasets by each Satori DAC | |
# tested with python 3 | |
# Usage: | |
# at a command line run: | |
# python get_all_dac_info.py.py | |
# You must fill in the three Satori account variables below | |
import json | |
import requests | |
import sys | |
satori_account_id = "FILL_IN" | |
satori_serviceaccount_id = "FILL_IN" | |
satori_serviceaccount_key = "FILL_IN" | |
satori_apihost = "app.satoricyber.com" | |
############################### | |
## No changes below this line | |
## unless experimenting | |
############################### | |
def satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost): | |
# Get a Bearer Token for all the rest of this example | |
auth_headers = {'content-type': 'application/json','accept': 'application/json'} | |
auth_url = "https://{}/api/authentication/token".format(satori_apihost) | |
auth_body = json.dumps( | |
{ | |
"serviceAccountId": satori_serviceaccount_id, | |
"serviceAccountKey": satori_serviceaccount_key | |
}) | |
try: | |
r = requests.post(auth_url, headers=auth_headers, data=auth_body) | |
response = r.json() | |
satori_token = response["token"] | |
except Exception as err: | |
print("Bearer Token Failure: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return satori_token | |
# our API bearer token for the rest of this example: | |
satori_token = satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost) | |
def get_dacs(): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/data-access-controllers?accountId={}&pageSize=1000".format( | |
satori_apihost, | |
satori_account_id | |
) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of dac data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response | |
def get_datasets(): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/dataset?accountId={}&pageSize=1000".format( | |
satori_apihost, | |
satori_account_id | |
) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of audit data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response | |
def get_datastores(): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/datastore?accountId={}&pageSize=1000".format( | |
satori_apihost, | |
satori_account_id | |
) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of audit data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response | |
def get_datastore_name(datastore_id): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/datastore/{}".format(satori_apihost, datastore_id) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of audit data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response.json()['name'] | |
def find_one_datastore(this_datastore_id): | |
datastore_name = get_datastore_name(this_datastore_id) | |
datasets = get_datasets() | |
print(' included in the following Datasets'.format(datastore_name)) | |
for location in datasets.json()['records']: | |
for item in location['includeLocations']: | |
if item['dataStoreId']== this_datastore_id: | |
print(" " + location['name']) | |
print(' excluded in the following Datasets'.format(datastore_name)) | |
for location in datasets.json()['records']: | |
for item in location['excludeLocations']: | |
if item['dataStoreId']== this_datastore_id: | |
print(" " + location['name']) | |
def group_everything_by_dac(): | |
datasets = get_datasets() | |
datastores = get_datastores() | |
for dac in get_dacs().json()['records']: | |
print('DAC NAME: ' + dac['name'] + "\n") | |
for datastore in datastores.json()['records']: | |
if datastore['dataAccessControllerId'] == dac['id']: | |
print('\n Datastore Name: ' + datastore['name'] + '\n Datastore ID: ' + datastore['id']) | |
find_one_datastore(datastore['id']) | |
print("\n") | |
print("__________________________________________________________\n") | |
def main(): | |
group_everything_by_dac() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment