Skip to content

Instantly share code, notes, and snippets.

@northwestcoder
Last active November 8, 2022 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save northwestcoder/7b948a92b30dab5d9c9c8e31efc1fecd to your computer and use it in GitHub Desktop.
Save northwestcoder/7b948a92b30dab5d9c9c8e31efc1fecd to your computer and use it in GitHub Desktop.
AWS Lambda to Push Satori Audit Data into AWS OpenSearch
import awswrangler as wr
import requests
import time
import datetime
import io
import pandas
import boto3
import base64
from botocore.exceptions import ClientError
import json
secret_name = "CHANGE_TO_YOUR_SECRET_NAME"
region_name = "CHANGE_TO_YOUR_REGION"
os_host = "CHANGE_TO_YOUR_OPENSEARCH_DOMAIN"
the_index_name = "satori_test"
def get_secrets():
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
return json.loads(secret)
else:
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return decoded_binary_secret
# Your code goes here.
def get_satori_audit_data(lambda_secrets):
satori_serviceaccount_id = lambda_secrets['satori_service_id']
satori_serviceaccount_key = lambda_secrets['satori_service_key']
satori_account_id = lambda_secrets['satori_account_id']
satori_host = "app.satoricyber.com"
# This function retrieves Satori audit entries from the last 15 minutes
min15ago = datetime.datetime.now() - datetime.timedelta(minutes=15)
unix_time_start = min15ago.strftime("%s") + "000"
unix_time_end = str(int(min15ago.strftime("%s")) + (900)) + "000"
# Authenticate to Satori for a bearer token
authheaders = {'content-type': 'application/json','accept': 'application/json'}
url = "https://{}/api/authentication/token".format(satori_host)
try:
r = requests.post(url,
headers=authheaders,
data='{"serviceAccountId": "' + satori_serviceaccount_id +
'", "serviceAccountKey": "' + satori_serviceaccount_key + '"}')
response = r.json()
satori_token = response["token"]
except Exception as err:
print("Bearer Token Failure: :", err)
print("Exception TYPE:", type(err))
# build request to rest API for audit entries, aka "data flows"
payload = {}
headers = {
'Authorization': 'Bearer {}'.format(satori_token),
}
auditurl = "https://{}/api/data-flow/{}/export?from={}&to={}".format(satori_host,
satori_account_id,
unix_time_start,
unix_time_end
)
try:
response = requests.get(auditurl, headers=headers, data=payload)
response.raise_for_status()
except requests.exceptions.RequestException as err:
print("Retrieval of audit data failed: :", err)
print("Exception TYPE:", type(err))
else:
fileobj = io.StringIO()
return io.StringIO(response.text)
def lambda_handler(event, context):
lambda_secrets = get_secrets()
os_data = pandas.read_csv(get_satori_audit_data(lambda_secrets))
os_data['flow_timestamp'] = os_data['flow_timestamp'].str[:19]
client = wr.opensearch.connect(
os_host,
username=lambda_secrets['opensearch_user'],
password=lambda_secrets['opensearch_pass'])
wr.opensearch.index_df(client,df=os_data,index=the_index_name,id_keys=["flow_id"],bulk_size=1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment