Skip to content

Instantly share code, notes, and snippets.

@northwestcoder
Created October 3, 2023 16:08
Show Gist options
  • Save northwestcoder/281b63eb8121275d9d21a4133ece9939 to your computer and use it in GitHub Desktop.
Save northwestcoder/281b63eb8121275d9d21a4133ece9939 to your computer and use it in GitHub Desktop.
Group Satori Datastores by DACs
# Simple Command line to group all Satori Datastores by each Satori DAC
# tested with python 3
# Usage:
# at a command line run:
# python find_datastores.py
# You must fill in the three Satori account variables below
import json
import requests
import sys
satori_account_id = "FILL_IN"
satori_serviceaccount_id = "FILL_IN"
satori_serviceaccount_key = "FILL_IN"
satori_apihost = "app.satoricyber.com"
###############################
## No changes below this line
## unless experimenting
###############################
def satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost):
# Get a Bearer Token for all the rest of this example
auth_headers = {'content-type': 'application/json','accept': 'application/json'}
auth_url = "https://{}/api/authentication/token".format(satori_apihost)
auth_body = json.dumps(
{
"serviceAccountId": satori_serviceaccount_id,
"serviceAccountKey": satori_serviceaccount_key
})
try:
r = requests.post(auth_url, headers=auth_headers, data=auth_body)
response = r.json()
satori_token = response["token"]
except Exception as err:
print("Bearer Token Failure: :", err)
print("Exception TYPE:", type(err))
else:
return satori_token
# our API bearer token for the rest of this example:
satori_token = satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost)
def get_dacs():
headers = {'Authorization': 'Bearer {}'.format(satori_token),}
url = "https://{}/api/v1/data-access-controllers?accountId={}&pageSize=1000".format(
satori_apihost,
satori_account_id
)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as err:
print("Retrieval of dac data failed: :", err)
print("Exception TYPE:", type(err))
else:
return response
def get_datastores():
headers = {'Authorization': 'Bearer {}'.format(satori_token),}
url = "https://{}/api/v1/datastore?accountId={}&pageSize=1000".format(
satori_apihost,
satori_account_id
)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as err:
print("Retrieval of audit data failed: :", err)
print("Exception TYPE:", type(err))
else:
return response
def group_datastores_by_dac():
datastores = get_datastores()
for dac in get_dacs().json()['records']:
print(dac['name'] + ":\n")
for datastore in datastores.json()['records']:
if datastore['dataAccessControllerId'] == dac['id']:
print(datastore['name'] + ' (' + datastore['id'] + ')')
print("\n")
def main():
group_datastores_by_dac()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment