Last active
October 12, 2022 15:05
-
-
Save northwestcoder/7a7403d663c6e6d074e722c4255d53f1 to your computer and use it in GitHub Desktop.
Satori Snowflake table scan for inventory population
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ast import Try | |
import datetime | |
import os | |
import re | |
import snowflake.connector | |
# import sqlparsing | |
import time | |
import sys | |
# Constants | |
IGNORED_DATABASES = ['SNOWFLAKE_SAMPLE_DATA', 'SNOWFLAKE'] | |
IGNORED_SCHEMAS = ['INFORMATION_SCHEMA'] | |
SNOWFLAKE_ACCOUNT = str(input("Enter Snowflake Account <account.snowflakecomputing.com> =")) | |
SNOWFLAKE_WAREHOUSE = str(input("Enter Snowflake Warehouse Name =")) # enter SNOWFLAKE_WAREHOUSE name | |
SNOWFLAKE_USER = str(input("Enter Snowflake user id =")) # enter SNOWFLAKE_USER | |
SNOWFLAKE_PASSWORD = str(input("Enter Snowflake user password =")) # enter snowflake password | |
ROLE = 'ACCOUNTADMIN' | |
SATORI_HOSTNAME = str(input("Enter Satori hostname <account.satoricyber.net> =")) #enter Satori SATORI_HOSTNAME | |
ROWS_TO_PULL = 300 | |
START_TBL_KEY = int(input("Enter table position key, if don't know put 0 =")) | |
SCAN_TABLES = True | |
SCAN_VIEWS = True | |
# </Constants> | |
con = snowflake.connector.connect( | |
user=SNOWFLAKE_USER, | |
password= SNOWFLAKE_PASSWORD, | |
account=SNOWFLAKE_ACCOUNT, | |
warehouse=SNOWFLAKE_WAREHOUSE, | |
host=SATORI_HOSTNAME, | |
role=ROLE | |
) | |
def inventory_populator(running_mode: str, query_depth_mode: str): | |
print("Running in {}-{} mode".format(running_mode, query_depth_mode)) | |
tables = [] | |
views = [] | |
table_columns = [] | |
view_columns = [] | |
key_counter = 0 | |
print("Populating tables") | |
# full mode, gathering columns | |
if len(sys.argv)==4: | |
str_sql = "SHOW COLUMNS IN DATABASE " + sys.argv[3] | |
elif len(sys.argv)==5: | |
str_sql = "SHOW COLUMNS IN SCHEMA " + sys.argv[3] + "." + sys.argv[4] | |
elif len(sys.argv)==6: | |
str_sql = "SHOW COLUMNS IN TABLE " + sys.argv[3] + "." + sys.argv[4] + "." + sys.argv[5] | |
else: | |
str_sql = "SHOW COLUMNS IN ACCOUNT" | |
for record in con.cursor().execute(str_sql): | |
table_name = record[0] | |
db_name = record[9] | |
schema_name = record[1] | |
column_name = record[2] | |
column_type = record[6] | |
if (db_name not in IGNORED_DATABASES and schema_name not in IGNORED_SCHEMAS): | |
fully_qualified_location = "\"{}\".\"{}\".\"{}\"".format(db_name.replace("\"", "\"\""), schema_name.replace("\"", "\"\""), table_name.replace("\"", "\"\"")) | |
key_counter=key_counter+1 | |
if column_type == "COLUMN": | |
if fully_qualified_location not in tables: | |
tables.append(fully_qualified_location) | |
table_columns.append({"table": fully_qualified_location, "column": column_name,"key":key_counter}) | |
elif column_type == "VIEW_COLUMN": | |
if fully_qualified_location not in views: | |
views.append(fully_qualified_location) | |
key_counter=key_counter+1 | |
view_columns.append({"view": fully_qualified_location, "column": column_name,"key":key_counter}) | |
if SCAN_TABLES: | |
if query_depth_mode == "full": | |
# going through all columns | |
for column in table_columns: | |
if running_mode == "wet" and column["key"] >= START_TBL_KEY and START_TBL_KEY!=0: | |
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"])) | |
#str_sql = "SELECT DISTINCT {} FROM {} LIMIT {};".format(column["column"], column["table"], ROWS_TO_PULL) | |
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"]) | |
if running_mode == "wet" and START_TBL_KEY == 0: | |
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"])) | |
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"]) | |
if running_mode == "dry": | |
else: | |
try: | |
for record in con.cursor().execute(str_sql): | |
pass | |
except: | |
print("Error querying {}".format(view)) | |
else: | |
for table in tables: | |
if running_mode == "wet": | |
print("Querying {}".format(table)) | |
str_sql = "SELECT * FROM {} LIMIT {};".format(table, ROWS_TO_PULL) | |
if running_mode == "dry": | |
else: | |
try: | |
for record in con.cursor().execute(str_sql): | |
pass | |
except: | |
print("Error querying {}".format(view)) | |
if SCAN_VIEWS: | |
if query_depth_mode == "full": | |
# going through all columns | |
for column in view_columns: | |
if running_mode == "wet" and column["key"] >= START_TBL_KEY and START_TBL_KEY!=0: | |
print("Querying {}.{} at position {}".format(column["table"], column["column"],column["key"])) | |
#str_sql = "SELECT DISTINCT {} FROM {} LIMIT {};".format(column["column"], column["table"], ROWS_TO_PULL) | |
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"]) | |
if running_mode == "wet" and START_TBL_KEY == 0: | |
print("Querying {}.{}".format(column["view"], column["column"])) | |
str_sql = "SELECT TOP 300 {} FROM {} WHERE TRY_HEX_DECODE_STRING(HEX_ENCODE({})) IS NOT NULL AND LENGTH(TRIM({})) > 0;".format(column["column"],column["table"], column["column"],column["column"]) | |
if running_mode == "dry": | |
else: | |
try: | |
for record in con.cursor().execute(str_sql): | |
pass | |
except: | |
print("Error querying {}".format(view)) | |
else: | |
for view in views: | |
if running_mode == "wet": | |
print("Querying {}".format(view)) | |
str_sql = "SELECT * FROM {} LIMIT {};".format(view, ROWS_TO_PULL) | |
if running_mode == "dry": | |
else: | |
try: | |
for record in con.cursor().execute(str_sql): | |
pass | |
except: | |
print("Error querying {}".format(view)) | |
if __name__ == "__main__": | |
if len(sys.argv) == 1: | |
print(""" | |
Welcome to Inventory Populator! | |
This tool retrieves results from all your Snowflake tables, to populate your Satori inventory. | |
Pre-requisites: | |
- Configure the constants in the # constants section of inventory_populator.py | |
- Use a user with sufficient privileges to view all tables & views | |
- Set your Snowflake password as the environment variable SNOWFLAKE_PASSWORD (alternatively put it in the connection string) | |
Modes: | |
dry/wet mode: dry mode only generates the queries, wet mode executes them | |
full/partial mode: full requests distinct values from all columns | |
partial only selects * from tables. | |
Running the tool: | |
python3 inventory_populator.py <dry/wet> <full/partial> optional <database_name> <schema_name> <table_name> | |
To only generate the SQL queries (to be run elsewhere): | |
python3 inventory_populator.py dry full | |
To actually execute the queries and populate the inventory: | |
python3 inventory_populator.py wet full | |
""") | |
else: | |
if sys.argv[1] in ['dry', 'wet']: | |
if sys.argv[2] in ['full']: | |
inventory_populator(sys.argv[1], 'full') | |
else: | |
inventory_populator(sys.argv[1], 'partial') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment