-
-
Save northwestcoder/c7dec1e50b47d92d1f8eef55524ee14b to your computer and use it in GitHub Desktop.
Satori Audit Log Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "46608d8e", | |
"metadata": {}, | |
"source": [ | |
"#### In the Satori Platform, \"Audit Entries\" in the UX are known as \"data flows\" in the Rest API\n", | |
"#### This python example connects to /api/data-flow/{accountId}/export to retrieve NN days of these audit entries / data flows\n", | |
"#### We then load this info into a postgres table\n", | |
"#### This is a gist - it is not production code!" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "abbb3f32", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import json\n", | |
"import requests\n", | |
"import time\n", | |
"import datetime\n", | |
"import yaml\n", | |
"import io\n", | |
"import psycopg2" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "d3e6f2fa", | |
"metadata": {}, | |
"source": [ | |
"#### Requirements\n", | |
"- You have a Satori account with admin access and the ability to call the Satori Rest API\n", | |
"- You have a postgres database somewhere, with access to create and drop tables" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "4fbaa04d", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# ENV and CONFIG\n", | |
"# you need to change or verify *all* of these values for this example to work\n", | |
"\n", | |
"# Postgres Authentication\n", | |
"postgres_server = \"CHANGEME\"\n", | |
"postgres_port = \"5432\"\n", | |
"postgres_username = \"postgres\"\n", | |
"postgres_password = \"CHANGEME\"\n", | |
"postgres_database_name = \"CHANGEME\"\n", | |
"\n", | |
"# schema and table, we don't create a new schema in this example\n", | |
"# but we do drop-if-not-exist and then create a new audit table\n", | |
"# using the postgres_table_name variable\n", | |
"postgres_schema_name = \"public\"\n", | |
"postgres_table_name = \"audit_data\"\n", | |
"\n", | |
"# Satori Authentication\n", | |
"# see https://app.satoricyber.com/docs/api for auth info\n", | |
"satori_serviceaccount_id = \"CHANGEME\"\n", | |
"satori_serviceaccount_key = \"CHANGEME\"\n", | |
"satori_account_id = \"CHANGEME\"\n", | |
"satori_host = \"app.satoricyber.com\"\n", | |
"\n", | |
"# HOW MANY DAYS OF PAST AUDIT ENTRIES DO WE WANT? (int)\n", | |
"days_ago = 7" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "9419273c", | |
"metadata": {}, | |
"source": [ | |
"#### You should not have to change anything below this line if you are just testing" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "5bb790d4", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Retriever Function, Satori Audit Data via Rest API\n", | |
"# we define a function to retrieve audit entries / data flows, \n", | |
"# this is called in the final jupyter block below\n", | |
"# This is a sample only, do not use in production!\n", | |
"def getAuditLogs():\n", | |
"\n", | |
" # This function retrieves Satori audit entries from the last thirty days up to yesterday\n", | |
" yesterday_start = datetime.date.today() - datetime.timedelta(days_ago)\n", | |
" unix_time_start = yesterday_start.strftime(\"%s\") + \"000\"\n", | |
" unix_time_end = str(int(yesterday_start.strftime(\"%s\")) + (86400*30)) + \"000\"\n", | |
"\n", | |
" # Authenticate to Satori for a bearer token\n", | |
" authheaders = {'content-type': 'application/json','accept': 'application/json'}\n", | |
" url = \"https://{}/api/authentication/token\".format(satori_host)\n", | |
" try:\n", | |
" r = requests.post(url, \n", | |
" headers=authheaders, \n", | |
" data='{\"serviceAccountId\": \"' + satori_serviceaccount_id + \n", | |
" '\", \"serviceAccountKey\": \"' + satori_serviceaccount_key + '\"}')\n", | |
" response = r.json()\n", | |
" satori_token = response[\"token\"]\n", | |
" except Exception as err:\n", | |
" print(\"Bearer Token Failure: :\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
" else:\n", | |
" # psycopg2 expects a file-like object\n", | |
" sqlFile = io.StringIO()\n", | |
" \n", | |
" # build request to rest API for audit entries, aka \"data flows\"\n", | |
" payload = {}\n", | |
" headers = {\n", | |
" 'Authorization': 'Bearer {}'.format(satori_token),\n", | |
" }\n", | |
" auditurl = \"https://{}/api/data-flow/{}/export?from={}&to={}\".format(satori_host,\n", | |
" satori_account_id,\n", | |
" unix_time_start,\n", | |
" unix_time_end)\n", | |
" try:\n", | |
" response = requests.get(auditurl, headers=headers, data=payload)\n", | |
" response.raise_for_status()\n", | |
" except requests.exceptions.RequestException as err:\n", | |
" print(\"Retrieval of audit data failed: :\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
" else:\n", | |
" # return a StringIO object which is what psycopg2 wants to see for its bulk load call\n", | |
" return io.StringIO(response.text)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "81afe813", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# MAIN WORK: create a temp table, copy API data into it, \n", | |
"# then insert from temp table to final table\n", | |
"# We assume your connection user has the privileges for this\n", | |
"\n", | |
"conn = psycopg2.connect(database=postgres_database_name,\n", | |
" user=postgres_username, \n", | |
" password=postgres_password, \n", | |
" host=postgres_server, \n", | |
" port=postgres_port)\n", | |
" \n", | |
"conn.autocommit = True\n", | |
"cursor = conn.cursor()\n", | |
"\n", | |
"\n", | |
"###############################################################\n", | |
"# STEP 1: create our final destination table for audit entries\n", | |
"# take note of our primary key \"flow_id\", this is used to resolve\n", | |
"# conflicts in step 4\n", | |
"\n", | |
"satori_create_table = \"CREATE TABLE if not exists \"\n", | |
"satori_create_table += postgres_schema_name + \".\" + postgres_table_name\n", | |
"satori_create_table += \"\"\"\n", | |
"(flow_timestamp timestamp, account_id varchar, data_store_id varchar, \n", | |
"flow_id varchar constraint table_name_audit_data_pk primary key,\n", | |
"data_store_type varchar, data_store_name varchar, identity_name varchar, identity_role varchar,\n", | |
"tool varchar, locations_location varchar, queries_value bigint, volume_value bigint,\n", | |
"incident_ids varchar, tags_name varchar, source varchar, records_value bigint,\n", | |
"result_set varchar, result_set_column_name varchar, query_original_query varchar,\n", | |
"datasets_id varchar, datasets_name varchar, query_query_type varchar, query_meta_data varchar, \n", | |
"query_meta_data_error varchar, actions_type varchar, snowflake_query_id varchar, \n", | |
"snowflake_warehouse_name varchar, athena_query_execution_id varchar, actions_policy_names varchar, \n", | |
"geo_location_attrs_country_name varchar, geo_location_attrs_city_name varchar, \n", | |
"geo_location_attrs_timezone varchar, geo_location_attrs_client_ip_str varchar, \n", | |
"identity_authentication_method varchar)\n", | |
"\"\"\"\n", | |
"try:\n", | |
" cursor.execute(satori_create_table)\n", | |
" conn.commit()\n", | |
"except Exception as err:\n", | |
" print(\"Oops! An exception has occured:\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
" \n", | |
" \n", | |
"############################################################\n", | |
"# STEP 2: create temp table\n", | |
"\n", | |
"satori_create_temp_table_sql = \"CREATE TABLE \"\n", | |
"satori_create_temp_table_sql += postgres_schema_name + \".satori_audit_tempbuffer\"\n", | |
"satori_create_temp_table_sql += \"\"\"\n", | |
"(flow_timestamp timestamp, account_id varchar, data_store_id varchar, flow_id varchar,\n", | |
"data_store_type varchar, data_store_name varchar, identity_name varchar, identity_role varchar,\n", | |
"tool varchar, locations_location varchar, queries_value bigint, volume_value bigint,\n", | |
"incident_ids varchar, tags_name varchar, source varchar, records_value bigint,\n", | |
"result_set varchar, result_set_column_name varchar, query_original_query varchar,\n", | |
"datasets_id varchar, datasets_name varchar, query_query_type varchar, query_meta_data varchar, \n", | |
"query_meta_data_error varchar, actions_type varchar, snowflake_query_id varchar, \n", | |
"snowflake_warehouse_name varchar, athena_query_execution_id varchar, actions_policy_names varchar, \n", | |
"geo_location_attrs_country_name varchar, geo_location_attrs_city_name varchar, \n", | |
"geo_location_attrs_timezone varchar, geo_location_attrs_client_ip_str varchar, \n", | |
"identity_authentication_method varchar)\n", | |
"\"\"\"\n", | |
"\n", | |
"try:\n", | |
" cursor.execute(query=satori_create_temp_table_sql)\n", | |
" conn.commit()\n", | |
"except Exception as err:\n", | |
" print(\"Oops! An exception has occured:\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
"\n", | |
"\n", | |
"############################################################\n", | |
"# STEP 3: load from Satori API to temp table\n", | |
"\n", | |
"satori_copy_to_buffer = \"COPY \" + postgres_schema_name + \".satori_audit_tempbuffer\" + \" FROM stdin WITH CSV HEADER DELIMITER ','\"\n", | |
"\n", | |
"try:\n", | |
" cursor.copy_expert(sql=satori_copy_to_buffer, file=getAuditLogs())\n", | |
" conn.commit()\n", | |
"except Exception as err:\n", | |
" print(\"Oops! An exception has occured:\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
" \n", | |
" \n", | |
"############################################################\n", | |
"# STEP 4: insert from temp table into final table\n", | |
"\n", | |
"satori_insert_sql = \"INSERT into \" + postgres_schema_name + \".\" + postgres_table_name\n", | |
"satori_insert_sql += \"\"\"\n", | |
"(flow_timestamp, account_id, data_store_id, flow_id, data_store_type, data_store_name,\n", | |
"identity_name, identity_role, tool, locations_location, queries_value, volume_value,\n", | |
"incident_ids, tags_name, source, records_value, result_set, result_set_column_name,\n", | |
"query_original_query, datasets_id, datasets_name, query_query_type, query_meta_data,\n", | |
"query_meta_data_error, actions_type, snowflake_query_id, snowflake_warehouse_name,\n", | |
"athena_query_execution_id, actions_policy_names, geo_location_attrs_country_name,\n", | |
"geo_location_attrs_city_name, geo_location_attrs_timezone, \n", | |
"geo_location_attrs_client_ip_str, identity_authentication_method)\n", | |
"select\n", | |
"flow_timestamp, account_id, data_store_id, flow_id, data_store_type, data_store_name,\n", | |
"identity_name, identity_role, tool, locations_location, queries_value, volume_value,\n", | |
"incident_ids, tags_name, source, records_value, result_set, result_set_column_name,\n", | |
"query_original_query, datasets_id, datasets_name, query_query_type, query_meta_data,\n", | |
"query_meta_data_error, actions_type, snowflake_query_id, snowflake_warehouse_name,\n", | |
"athena_query_execution_id, actions_policy_names, geo_location_attrs_country_name,\n", | |
"geo_location_attrs_city_name, geo_location_attrs_timezone, \n", | |
"geo_location_attrs_client_ip_str, identity_authentication_method\n", | |
"from satori_audit_tempbuffer ON CONFLICT (flow_id) DO NOTHING;\n", | |
"\"\"\"\n", | |
"\n", | |
"try:\n", | |
" cursor.execute(query=satori_insert_sql)\n", | |
" conn.commit()\n", | |
"except Exception as err:\n", | |
" print(\"Oops! An exception has occured:\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
"\n", | |
"############################################################\n", | |
"# STEP 5: drop our temp table\n", | |
"\n", | |
"satori_drop_temp_table_sql = \"DROP TABLE \"\n", | |
"satori_drop_temp_table_sql += postgres_schema_name + \".satori_audit_tempbuffer\"\n", | |
"\n", | |
"try:\n", | |
" cursor.execute(query=satori_drop_temp_table_sql)\n", | |
" conn.commit()\n", | |
" conn.close()\n", | |
" print(\"We got to the end, it all seems to have worked out\")\n", | |
"except Exception as err:\n", | |
" print(\"Oops! An exception has occured:\", err)\n", | |
" print(\"Exception TYPE:\", type(err))\n", | |
" conn.close() \n", | |
" " | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.4" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment