Skip to content

Instantly share code, notes, and snippets.

@northwestcoder
Last active August 26, 2022 17:41
Show Gist options
  • Save northwestcoder/c7dec1e50b47d92d1f8eef55524ee14b to your computer and use it in GitHub Desktop.
Save northwestcoder/c7dec1e50b47d92d1f8eef55524ee14b to your computer and use it in GitHub Desktop.
Satori Audit Log Storage
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "46608d8e",
"metadata": {},
"source": [
"#### In the Satori Platform, \"Audit Entries\" in the UX are known as \"data flows\" in the Rest API\n",
"#### This python example connects to /api/data-flow/{accountId}/export to retrieve NN days of these audit entries / data flows\n",
"#### We then load this info into a postgres table\n",
"#### This is a gist - it is not production code!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "abbb3f32",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import requests\n",
"import time\n",
"import datetime\n",
"import yaml\n",
"import io\n",
"import psycopg2"
]
},
{
"cell_type": "markdown",
"id": "d3e6f2fa",
"metadata": {},
"source": [
"#### Requirements\n",
"- You have a Satori account with admin access and the ability to call the Satori Rest API\n",
"- You have a postgres database somewhere, with access to create and drop tables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4fbaa04d",
"metadata": {},
"outputs": [],
"source": [
"# ENV and CONFIG\n",
"# you need to change or verify *all* of these values for this example to work\n",
"\n",
"# Postgres Authentication\n",
"postgres_server = \"CHANGEME\"\n",
"postgres_port = \"5432\"\n",
"postgres_username = \"postgres\"\n",
"postgres_password = \"CHANGEME\"\n",
"postgres_database_name = \"CHANGEME\"\n",
"\n",
"# schema and table, we don't create a new schema in this example\n",
"# but we do drop-if-not-exist and then create a new audit table\n",
"# using the postgres_table_name variable\n",
"postgres_schema_name = \"public\"\n",
"postgres_table_name = \"audit_data\"\n",
"\n",
"# Satori Authentication\n",
"# see https://app.satoricyber.com/docs/api for auth info\n",
"satori_serviceaccount_id = \"CHANGEME\"\n",
"satori_serviceaccount_key = \"CHANGEME\"\n",
"satori_account_id = \"CHANGEME\"\n",
"satori_host = \"app.satoricyber.com\"\n",
"\n",
"# HOW MANY DAYS OF PAST AUDIT ENTRIES DO WE WANT? (int)\n",
"days_ago = 7"
]
},
{
"cell_type": "markdown",
"id": "9419273c",
"metadata": {},
"source": [
"#### You should not have to change anything below this line if you are just testing"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5bb790d4",
"metadata": {},
"outputs": [],
"source": [
"# Retriever Function, Satori Audit Data via Rest API\n",
"# we define a function to retrieve audit entries / data flows, \n",
"# this is called in the final jupyter block below\n",
"# This is a sample only, do not use in production!\n",
"def getAuditLogs():\n",
"\n",
" # This function retrieves Satori audit entries from the last thirty days up to yesterday\n",
" yesterday_start = datetime.date.today() - datetime.timedelta(days_ago)\n",
" unix_time_start = yesterday_start.strftime(\"%s\") + \"000\"\n",
" unix_time_end = str(int(yesterday_start.strftime(\"%s\")) + (86400*30)) + \"000\"\n",
"\n",
" # Authenticate to Satori for a bearer token\n",
" authheaders = {'content-type': 'application/json','accept': 'application/json'}\n",
" url = \"https://{}/api/authentication/token\".format(satori_host)\n",
" try:\n",
" r = requests.post(url, \n",
" headers=authheaders, \n",
" data='{\"serviceAccountId\": \"' + satori_serviceaccount_id + \n",
" '\", \"serviceAccountKey\": \"' + satori_serviceaccount_key + '\"}')\n",
" response = r.json()\n",
" satori_token = response[\"token\"]\n",
" except Exception as err:\n",
" print(\"Bearer Token Failure: :\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
" else:\n",
" # psycopg2 expects a file-like object\n",
" sqlFile = io.StringIO()\n",
" \n",
" # build request to rest API for audit entries, aka \"data flows\"\n",
" payload = {}\n",
" headers = {\n",
" 'Authorization': 'Bearer {}'.format(satori_token),\n",
" }\n",
" auditurl = \"https://{}/api/data-flow/{}/export?from={}&to={}\".format(satori_host,\n",
" satori_account_id,\n",
" unix_time_start,\n",
" unix_time_end)\n",
" try:\n",
" response = requests.get(auditurl, headers=headers, data=payload)\n",
" response.raise_for_status()\n",
" except requests.exceptions.RequestException as err:\n",
" print(\"Retrieval of audit data failed: :\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
" else:\n",
" # return a StringIO object which is what psycopg2 wants to see for its bulk load call\n",
" return io.StringIO(response.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "81afe813",
"metadata": {},
"outputs": [],
"source": [
"# MAIN WORK: create a temp table, copy API data into it, \n",
"# then insert from temp table to final table\n",
"# We assume your connection user has the privileges for this\n",
"\n",
"conn = psycopg2.connect(database=postgres_database_name,\n",
" user=postgres_username, \n",
" password=postgres_password, \n",
" host=postgres_server, \n",
" port=postgres_port)\n",
" \n",
"conn.autocommit = True\n",
"cursor = conn.cursor()\n",
"\n",
"\n",
"###############################################################\n",
"# STEP 1: create our final destination table for audit entries\n",
"# take note of our primary key \"flow_id\", this is used to resolve\n",
"# conflicts in step 4\n",
"\n",
"satori_create_table = \"CREATE TABLE if not exists \"\n",
"satori_create_table += postgres_schema_name + \".\" + postgres_table_name\n",
"satori_create_table += \"\"\"\n",
"(flow_timestamp timestamp, account_id varchar, data_store_id varchar, \n",
"flow_id varchar constraint table_name_audit_data_pk primary key,\n",
"data_store_type varchar, data_store_name varchar, identity_name varchar, identity_role varchar,\n",
"tool varchar, locations_location varchar, queries_value bigint, volume_value bigint,\n",
"incident_ids varchar, tags_name varchar, source varchar, records_value bigint,\n",
"result_set varchar, result_set_column_name varchar, query_original_query varchar,\n",
"datasets_id varchar, datasets_name varchar, query_query_type varchar, query_meta_data varchar, \n",
"query_meta_data_error varchar, actions_type varchar, snowflake_query_id varchar, \n",
"snowflake_warehouse_name varchar, athena_query_execution_id varchar, actions_policy_names varchar, \n",
"geo_location_attrs_country_name varchar, geo_location_attrs_city_name varchar, \n",
"geo_location_attrs_timezone varchar, geo_location_attrs_client_ip_str varchar, \n",
"identity_authentication_method varchar)\n",
"\"\"\"\n",
"try:\n",
" cursor.execute(satori_create_table)\n",
" conn.commit()\n",
"except Exception as err:\n",
" print(\"Oops! An exception has occured:\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
" \n",
" \n",
"############################################################\n",
"# STEP 2: create temp table\n",
"\n",
"satori_create_temp_table_sql = \"CREATE TABLE \"\n",
"satori_create_temp_table_sql += postgres_schema_name + \".satori_audit_tempbuffer\"\n",
"satori_create_temp_table_sql += \"\"\"\n",
"(flow_timestamp timestamp, account_id varchar, data_store_id varchar, flow_id varchar,\n",
"data_store_type varchar, data_store_name varchar, identity_name varchar, identity_role varchar,\n",
"tool varchar, locations_location varchar, queries_value bigint, volume_value bigint,\n",
"incident_ids varchar, tags_name varchar, source varchar, records_value bigint,\n",
"result_set varchar, result_set_column_name varchar, query_original_query varchar,\n",
"datasets_id varchar, datasets_name varchar, query_query_type varchar, query_meta_data varchar, \n",
"query_meta_data_error varchar, actions_type varchar, snowflake_query_id varchar, \n",
"snowflake_warehouse_name varchar, athena_query_execution_id varchar, actions_policy_names varchar, \n",
"geo_location_attrs_country_name varchar, geo_location_attrs_city_name varchar, \n",
"geo_location_attrs_timezone varchar, geo_location_attrs_client_ip_str varchar, \n",
"identity_authentication_method varchar)\n",
"\"\"\"\n",
"\n",
"try:\n",
" cursor.execute(query=satori_create_temp_table_sql)\n",
" conn.commit()\n",
"except Exception as err:\n",
" print(\"Oops! An exception has occured:\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
"\n",
"\n",
"############################################################\n",
"# STEP 3: load from Satori API to temp table\n",
"\n",
"satori_copy_to_buffer = \"COPY \" + postgres_schema_name + \".satori_audit_tempbuffer\" + \" FROM stdin WITH CSV HEADER DELIMITER ','\"\n",
"\n",
"try:\n",
" cursor.copy_expert(sql=satori_copy_to_buffer, file=getAuditLogs())\n",
" conn.commit()\n",
"except Exception as err:\n",
" print(\"Oops! An exception has occured:\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
" \n",
" \n",
"############################################################\n",
"# STEP 4: insert from temp table into final table\n",
"\n",
"satori_insert_sql = \"INSERT into \" + postgres_schema_name + \".\" + postgres_table_name\n",
"satori_insert_sql += \"\"\"\n",
"(flow_timestamp, account_id, data_store_id, flow_id, data_store_type, data_store_name,\n",
"identity_name, identity_role, tool, locations_location, queries_value, volume_value,\n",
"incident_ids, tags_name, source, records_value, result_set, result_set_column_name,\n",
"query_original_query, datasets_id, datasets_name, query_query_type, query_meta_data,\n",
"query_meta_data_error, actions_type, snowflake_query_id, snowflake_warehouse_name,\n",
"athena_query_execution_id, actions_policy_names, geo_location_attrs_country_name,\n",
"geo_location_attrs_city_name, geo_location_attrs_timezone, \n",
"geo_location_attrs_client_ip_str, identity_authentication_method)\n",
"select\n",
"flow_timestamp, account_id, data_store_id, flow_id, data_store_type, data_store_name,\n",
"identity_name, identity_role, tool, locations_location, queries_value, volume_value,\n",
"incident_ids, tags_name, source, records_value, result_set, result_set_column_name,\n",
"query_original_query, datasets_id, datasets_name, query_query_type, query_meta_data,\n",
"query_meta_data_error, actions_type, snowflake_query_id, snowflake_warehouse_name,\n",
"athena_query_execution_id, actions_policy_names, geo_location_attrs_country_name,\n",
"geo_location_attrs_city_name, geo_location_attrs_timezone, \n",
"geo_location_attrs_client_ip_str, identity_authentication_method\n",
"from satori_audit_tempbuffer ON CONFLICT (flow_id) DO NOTHING;\n",
"\"\"\"\n",
"\n",
"try:\n",
" cursor.execute(query=satori_insert_sql)\n",
" conn.commit()\n",
"except Exception as err:\n",
" print(\"Oops! An exception has occured:\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
"\n",
"############################################################\n",
"# STEP 5: drop our temp table\n",
"\n",
"satori_drop_temp_table_sql = \"DROP TABLE \"\n",
"satori_drop_temp_table_sql += postgres_schema_name + \".satori_audit_tempbuffer\"\n",
"\n",
"try:\n",
" cursor.execute(query=satori_drop_temp_table_sql)\n",
" conn.commit()\n",
" conn.close()\n",
" print(\"We got to the end, it all seems to have worked out\")\n",
"except Exception as err:\n",
" print(\"Oops! An exception has occured:\", err)\n",
" print(\"Exception TYPE:\", type(err))\n",
" conn.close() \n",
" "
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment