Created
October 3, 2023 16:08
-
-
Save northwestcoder/281b63eb8121275d9d21a4133ece9939 to your computer and use it in GitHub Desktop.
Group Satori Datastores by DACs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple Command line to group all Satori Datastores by each Satori DAC | |
# tested with python 3 | |
# Usage: | |
# at a command line run: | |
# python find_datastores.py | |
# You must fill in the three Satori account variables below | |
import json | |
import requests | |
import sys | |
satori_account_id = "FILL_IN" | |
satori_serviceaccount_id = "FILL_IN" | |
satori_serviceaccount_key = "FILL_IN" | |
satori_apihost = "app.satoricyber.com" | |
############################### | |
## No changes below this line | |
## unless experimenting | |
############################### | |
def satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost): | |
# Get a Bearer Token for all the rest of this example | |
auth_headers = {'content-type': 'application/json','accept': 'application/json'} | |
auth_url = "https://{}/api/authentication/token".format(satori_apihost) | |
auth_body = json.dumps( | |
{ | |
"serviceAccountId": satori_serviceaccount_id, | |
"serviceAccountKey": satori_serviceaccount_key | |
}) | |
try: | |
r = requests.post(auth_url, headers=auth_headers, data=auth_body) | |
response = r.json() | |
satori_token = response["token"] | |
except Exception as err: | |
print("Bearer Token Failure: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return satori_token | |
# our API bearer token for the rest of this example: | |
satori_token = satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost) | |
def get_dacs(): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/data-access-controllers?accountId={}&pageSize=1000".format( | |
satori_apihost, | |
satori_account_id | |
) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of dac data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response | |
def get_datastores(): | |
headers = {'Authorization': 'Bearer {}'.format(satori_token),} | |
url = "https://{}/api/v1/datastore?accountId={}&pageSize=1000".format( | |
satori_apihost, | |
satori_account_id | |
) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of audit data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
return response | |
def group_datastores_by_dac(): | |
datastores = get_datastores() | |
for dac in get_dacs().json()['records']: | |
print(dac['name'] + ":\n") | |
for datastore in datastores.json()['records']: | |
if datastore['dataAccessControllerId'] == dac['id']: | |
print(datastore['name'] + ' (' + datastore['id'] + ')') | |
print("\n") | |
def main(): | |
group_datastores_by_dac() | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment