Last active
November 8, 2022 16:33
-
-
Save northwestcoder/7b948a92b30dab5d9c9c8e31efc1fecd to your computer and use it in GitHub Desktop.
AWS Lambda to Push Satori Audit Data into AWS OpenSearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import awswrangler as wr | |
import requests | |
import time | |
import datetime | |
import io | |
import pandas | |
import boto3 | |
import base64 | |
from botocore.exceptions import ClientError | |
import json | |
secret_name = "CHANGE_TO_YOUR_SECRET_NAME" | |
region_name = "CHANGE_TO_YOUR_REGION" | |
os_host = "CHANGE_TO_YOUR_OPENSEARCH_DOMAIN" | |
the_index_name = "satori_test" | |
def get_secrets(): | |
# Create a Secrets Manager client | |
session = boto3.session.Session() | |
client = session.client( | |
service_name='secretsmanager', | |
region_name=region_name | |
) | |
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API. | |
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html | |
# We rethrow the exception by default. | |
try: | |
get_secret_value_response = client.get_secret_value(SecretId=secret_name) | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'DecryptionFailureException': | |
# Secrets Manager can't decrypt the protected secret text using the provided KMS key. | |
# Deal with the exception here, and/or rethrow at your discretion. | |
raise e | |
elif e.response['Error']['Code'] == 'InternalServiceErrorException': | |
# An error occurred on the server side. | |
# Deal with the exception here, and/or rethrow at your discretion. | |
raise e | |
elif e.response['Error']['Code'] == 'InvalidParameterException': | |
# You provided an invalid value for a parameter. | |
# Deal with the exception here, and/or rethrow at your discretion. | |
raise e | |
elif e.response['Error']['Code'] == 'InvalidRequestException': | |
# You provided a parameter value that is not valid for the current state of the resource. | |
# Deal with the exception here, and/or rethrow at your discretion. | |
raise e | |
elif e.response['Error']['Code'] == 'ResourceNotFoundException': | |
# We can't find the resource that you asked for. | |
# Deal with the exception here, and/or rethrow at your discretion. | |
raise e | |
else: | |
# Decrypts secret using the associated KMS key. | |
# Depending on whether the secret is a string or binary, one of these fields will be populated. | |
if 'SecretString' in get_secret_value_response: | |
secret = get_secret_value_response['SecretString'] | |
return json.loads(secret) | |
else: | |
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary']) | |
return decoded_binary_secret | |
# Your code goes here. | |
def get_satori_audit_data(lambda_secrets): | |
satori_serviceaccount_id = lambda_secrets['satori_service_id'] | |
satori_serviceaccount_key = lambda_secrets['satori_service_key'] | |
satori_account_id = lambda_secrets['satori_account_id'] | |
satori_host = "app.satoricyber.com" | |
# This function retrieves Satori audit entries from the last 15 minutes | |
min15ago = datetime.datetime.now() - datetime.timedelta(minutes=15) | |
unix_time_start = min15ago.strftime("%s") + "000" | |
unix_time_end = str(int(min15ago.strftime("%s")) + (900)) + "000" | |
# Authenticate to Satori for a bearer token | |
authheaders = {'content-type': 'application/json','accept': 'application/json'} | |
url = "https://{}/api/authentication/token".format(satori_host) | |
try: | |
r = requests.post(url, | |
headers=authheaders, | |
data='{"serviceAccountId": "' + satori_serviceaccount_id + | |
'", "serviceAccountKey": "' + satori_serviceaccount_key + '"}') | |
response = r.json() | |
satori_token = response["token"] | |
except Exception as err: | |
print("Bearer Token Failure: :", err) | |
print("Exception TYPE:", type(err)) | |
# build request to rest API for audit entries, aka "data flows" | |
payload = {} | |
headers = { | |
'Authorization': 'Bearer {}'.format(satori_token), | |
} | |
auditurl = "https://{}/api/data-flow/{}/export?from={}&to={}".format(satori_host, | |
satori_account_id, | |
unix_time_start, | |
unix_time_end | |
) | |
try: | |
response = requests.get(auditurl, headers=headers, data=payload) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
print("Retrieval of audit data failed: :", err) | |
print("Exception TYPE:", type(err)) | |
else: | |
fileobj = io.StringIO() | |
return io.StringIO(response.text) | |
def lambda_handler(event, context): | |
lambda_secrets = get_secrets() | |
os_data = pandas.read_csv(get_satori_audit_data(lambda_secrets)) | |
os_data['flow_timestamp'] = os_data['flow_timestamp'].str[:19] | |
client = wr.opensearch.connect( | |
os_host, | |
username=lambda_secrets['opensearch_user'], | |
password=lambda_secrets['opensearch_pass']) | |
wr.opensearch.index_df(client,df=os_data,index=the_index_name,id_keys=["flow_id"],bulk_size=1000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment