Skip to content

Instantly share code, notes, and snippets.

@northwestcoder
Last active October 12, 2022 15:05
Show Gist options
  • Save northwestcoder/7a7403d663c6e6d074e722c4255d53f1 to your computer and use it in GitHub Desktop.
Save northwestcoder/7a7403d663c6e6d074e722c4255d53f1 to your computer and use it in GitHub Desktop.
Satori Snowflake table scan for inventory population
from ast import Try
import datetime
import os
import re
import snowflake.connector
# import sqlparsing
import time
import sys
# Constants
IGNORED_DATABASES = ['SNOWFLAKE_SAMPLE_DATA', 'SNOWFLAKE']
IGNORED_SCHEMAS = ['INFORMATION_SCHEMA']
SNOWFLAKE_ACCOUNT = str(input("Enter Snowflake Account <account.snowflakecomputing.com> ="))
SNOWFLAKE_WAREHOUSE = str(input("Enter Snowflake Warehouse Name =")) # enter SNOWFLAKE_WAREHOUSE name
SNOWFLAKE_USER = str(input("Enter Snowflake user id =")) # enter SNOWFLAKE_USER
SNOWFLAKE_PASSWORD = str(input("Enter Snowflake user password =")) # enter snowflake password
ROLE = 'ACCOUNTADMIN'
SATORI_HOSTNAME = str(input("Enter Satori hostname <account.satoricyber.net> =")) #enter Satori SATORI_HOSTNAME
ROWS_TO_PULL = 300
START_TBL_KEY = int(input("Enter table position key, if don't know put 0 ="))
SCAN_TABLES = True
SCAN_VIEWS = True
# </Constants>
con = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password= SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
host=SATORI_HOSTNAME,
role=ROLE
)
def inventory_populator(running_mode: str, query_depth_mode: str):
print("Running in {}-{} mode".format(running_mode, query_depth_mode))
tables = []
views = []
table_columns = []
view_columns = []
key_counter = 0
print("Populating tables")
# full mode, gathering columns
if len(sys.argv)==4:
str_sql = "SHOW COLUMNS IN DATABASE " + sys.argv[3]
elif len(sys.argv)==5:
str_sql = "SHOW COLUMNS IN SCHEMA " + sys.argv[3] + "." + sys.argv[4]
elif len(sys.argv)==6:
str_sql = "SHOW COLUMNS IN TABLE " + sys.argv[3] + "." + sys.argv[4] + "." + sys.argv[5]
else:
str_sql = "SHOW COLUMNS IN ACCOUNT"
for record in con.cursor().execute(str_sql):
table_name = record[0]
db_name = record[9]
schema_name = record[1]
column_name = record[2]
column_type = record[6]
if (db_name not in IGNORED_DATABASES and schema_name not in IGNORED_SCHEMAS):
fully_qualified_location = "\"{}\".\"{}\".\"{}\"".format(db_name.replace("\"", "\"\""), schema_name.replace("\"", "\"\""), table_name.replace("\"", "\"\""))
key_counter=key_counter+1
if column_type == "COLUMN":
if fully_qualified_location not in tables:
tables.append(fully_qualified_location)
table_columns.append({"table": fully_qualified_location, "column": column_name,"key":key_counter})
elif column_type == "VIEW_COLUMN":
if fully_qualified_location not in views:
views.append(fully_qualified_location)
key_counter=key_counter+1
view_columns.append({"view": fully_qualified_location, "column": column_name,"key":key_counter})
if SCAN_TABLES:
if query_depth_mode == "full":
# going through all columns
for column in table_columns:
if running_mode == "wet" and column["key"] >= START_TBL_KEY and START_TBL_KEY!=0:
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"]))
#str_sql = "SELECT DISTINCT {} FROM {} LIMIT {};".format(column["column"], column["table"], ROWS_TO_PULL)
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"])
if running_mode == "wet" and START_TBL_KEY == 0:
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"]))
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"])
if running_mode == "dry":
else:
try:
for record in con.cursor().execute(str_sql):
pass
except:
print("Error querying {}".format(view))
else:
for table in tables:
if running_mode == "wet":
print("Querying {}".format(table))
str_sql = "SELECT * FROM {} LIMIT {};".format(table, ROWS_TO_PULL)
if running_mode == "dry":
else:
try:
for record in con.cursor().execute(str_sql):
pass
except:
print("Error querying {}".format(view))
if SCAN_VIEWS:
if query_depth_mode == "full":
# going through all columns
for column in view_columns:
if running_mode == "wet" and column["key"] >= START_TBL_KEY and START_TBL_KEY!=0:
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"]))
#str_sql = "SELECT DISTINCT {} FROM {} LIMIT {};".format(column["column"], column["table"], ROWS_TO_PULL)
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"])
if running_mode == "wet" and START_TBL_KEY == 0:
print("Querying {}.{}".format(column["view"], column["column"]))
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"])
if running_mode == "dry":
else:
try:
for record in con.cursor().execute(str_sql):
pass
except:
print("Error querying {}".format(view))
else:
for view in views:
if running_mode == "wet":
print("Querying {}".format(view))
str_sql = "SELECT * FROM {} LIMIT {};".format(view, ROWS_TO_PULL)
if running_mode == "dry":
else:
try:
for record in con.cursor().execute(str_sql):
pass
except:
print("Error querying {}".format(view))
if __name__ == "__main__":
if len(sys.argv) == 1:
print("""
Welcome to Inventory Populator!
This tool retrieves results from all your Snowflake tables, to populate your Satori inventory.
Pre-requisites:
- Configure the constants in the # constants section of inventory_populator.py
- Use a user with sufficient privileges to view all tables & views
- Set your Snowflake password as the environment variable SNOWFLAKE_PASSWORD (alternatively put it in the connection string)
Modes:
dry/wet mode: dry mode only generates the queries, wet mode executes them
full/partial mode: full requests distinct values from all columns
partial only selects * from tables.
Running the tool:
python3 inventory_populator.py <dry/wet> <full/partial> optional <database_name> <schema_name> <table_name>
To only generate the SQL queries (to be run elsewhere):
python3 inventory_populator.py dry full
To actually execute the queries and populate the inventory:
python3 inventory_populator.py wet full
""")
else:
if sys.argv[1] in ['dry', 'wet']:
if sys.argv[2] in ['full']:
inventory_populator(sys.argv[1], 'full')
else:
inventory_populator(sys.argv[1], 'partial')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment