Skip to content

Instantly share code, notes, and snippets.

Last active August 17, 2023 17:04
Show Gist options
  • Save northwestcoder/148aa0b59fd8ffe5297fa59d775c3216 to your computer and use it in GitHub Desktop.
Save northwestcoder/148aa0b59fd8ffe5297fa59d775c3216 to your computer and use it in GitHub Desktop.
Send Satori Audit Data to PagerDuty
# This is a lightweight integration prototype that sends
# Satori audit data to PagerDuty. In this example we
# specify that any query which was blocked by Satori should also
# create a new incident in PagerDuty.
# We also check for any access or self service rules. If we find any,
# then we should not trigger PagerDuty as this is a type of false positive.
# We further refine this integration by using the Satori Audit ID
# as our PagerDuty "incident ID" - this prevents duplicate incidents
# from showing up in PagerDuty. The mapping of a Satori Audit ID to
# a PagerDuty Incident ID results in nice, clean UPSERT behavior.
# Finally, we specified a 1 hour duration of audit data to pull from Satori.
# In business terms this prototype is meant to be automated in AWS Lambda,
# Google Cloud Run or similar, with a periodicity of 1 hour.
import json
import requests
import datetime
import time
import io
# Authenticate to Satori for a bearer token
# see for more info
satori_account_id = ""
satori_serviceaccount_id = ""
satori_serviceaccount_key = ""
satori_apihost = ""
# PagerDuty API Key, this needs to be set up in their web UX
pagerduty_apikey = ""
# PagerDuty API URL, this can be left as-is for this example
pagerduty_incident_url = ""
# the pager duty service ID, we tested using the "Default Service"
# you can get the ID by navigating to your PagerDuty Service and getting the ID from the URL
pagerduty_service_id = ""
# the pager duty FROM / SENT BY email address
pagerduty_sentby = ""
# how many hours ago of Satori audit data do we want?
hours_ago = 1
# Which Satori action are we searching for?
action_type = "ACTION_REQUEST_BLOCK"
# If you are just testing or learning you should
# not have to change anything below this comment
def satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost):
# Get a Bearer Token for all the rest of this example
auth_headers = {'content-type': 'application/json','accept': 'application/json'}
auth_url = "https://{}/api/authentication/token".format(satori_apihost)
auth_body = json.dumps(
"serviceAccountId": satori_serviceaccount_id,
"serviceAccountKey": satori_serviceaccount_key
r =, headers=auth_headers, data=auth_body)
response = r.json()
satori_token = response["token"]
except Exception as err:
print("Bearer Token Failure: :", err)
print("Exception TYPE:", type(err))
return satori_token
# our API bearer token for the rest of this example:
satori_token = satori_auth(satori_serviceaccount_id, satori_serviceaccount_key, satori_apihost)
def get_custom_tag_audits():
# Function to retrieve Satori audit entries from the last N hours of type 'action_type'
unix_endtime = int(time.time()) * 1000
unix_starttime = (int(time.time()) - (hours_ago * 3600)) * 1000
# build request to rest API for audit entries, aka "data flows"
# only search for queries which were blocked in the last 1 hour
payload = {}
headers = {'Authorization': 'Bearer {}'.format(satori_token),}
auditurl = "https://{}/api/data-flow/{}/query?from={}&to={}&actionTypesFilter={}".format(
response = requests.get(auditurl, headers=headers, data=payload)
except requests.exceptions.RequestException as err:
print("Retrieval of audit data failed: :", err)
print("Exception TYPE:", type(err))
return response
def get_access_rules(dataset_id):
# function to get all access rules for the datasets
# If any are found, then we should NOT send an alert to PagerDuty
# because it's a false positive to assume that access was blocked
# when in fact the end user could have the option to request access
# using one of these access rules.
rule_count = 0
payload = {}
headers = {'Authorization': 'Bearer {}'.format(satori_token),}
# first type of access rule: manual approvals
url = "https://{}/api/v1/data-access-rule/access-request?parentId={}".format(
response = requests.get(url, headers=headers, data=payload)
except requests.exceptions.RequestException as err:
print("Retrieval of audit data failed: :", err)
print("Exception TYPE:", type(err))
rule_count += response.json()['count']
# second type of access rule: self-service rules
url = "https://{}/api/v1/data-access-rule/self-service?parentId={}".format(
response = requests.get(url, headers=headers, data=payload)
except requests.exceptions.RequestException as err:
print("Retrieval of audit data failed: :", err)
print("Exception TYPE:", type(err))
rule_count += response.json()['count']
return rule_count
def send_to_pagerduty(flow_id, user, query):
# Function to send Satori Audit Data to PagerDuty
# We kept our PagerDuty POST payload to a bare minimum
# You need to look up the Service ID from PagerDuty
# For the PagerDuty incident_key we reuse the Satori Audit ID which is a great convenience!
# From Satori, we pulled the audit id (flow_id), user email, and query that was run
# There is a wealth of additional information you can send from Satori to PagerDuty.
# see
payload = {"incident": {
"type": "incident",
"title": "Satori Audit ID " + flow_id,
"service": {
"id": pagerduty_service_id,
"type": "service_reference"
"urgency": "high",
"incident_key": flow_id,
"body": {
"type": "incident_body",
"details": user + " tried to run the following sensitive query but it was blocked by Satori:\n\n" + query
headers = {
"Content-Type": "application/json",
"Accept": "application/vnd.pagerduty+json;version=2",
"From": pagerduty_sentby,
"Authorization": "Token token=" + pagerduty_apikey
response = requests.request("POST", pagerduty_incident_url, json=payload, headers=headers)
except Exception as err:
print("Exception TYPE:", type(err))
# This is just for testing purposes
# There are a thousand different ways to automate this :)
# for example in AWS Lambda you would wrap all of the above inside a lambda context function
# and then trigger the function using AWS CloudWatch and/or EventBridge.
# the next few lines are just for testing and study.
for item in get_custom_tag_audits().json()['records']:
flow_id = str(item['flow_id'])
user = str(item['identity']['name'])
query = str(item['query'])
datasets = item['datasets']
total_rule_count = 0
for dataset in datasets:
# if you are testing and don't get a PagerDuty incident created
# it means the Satori dataset associated with the audit ID has one or
# more access rules and self-service rules defined
if get_access_rules(dataset['dataset_id']) == 0:
send_to_pagerduty(flow_id, user, query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment