Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@iamaziz
Created October 31, 2020 01:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iamaziz/5a86da581564de977c31b2b5f64f6258 to your computer and use it in GitHub Desktop.
Save iamaziz/5a86da581564de977c31b2b5f64f6258 to your computer and use it in GitHub Desktop.
Redshift Database Hook (adapter)
from __future__ import annotations
from functools import lru_cache, wraps
from psycopg2 import connect
from pandas import read_sql, DataFrame
class RedshiftDB:
"""Redshift Database Hook
USAGE:
creds = {
'host': '',
'user': '',
'password': '',
'port': 5439,
'dbname': ''
}
db = RedshiftDB(creds)
db.database_users
db.disk_usage
db.database_info
..
db.query('select * from <MYTABLE>;')
..
# for all available propertes, see
db.available_properties
More info, see: https://docs.aws.amazon.com/redshift/latest/dg/cm_chap_system-tables.html
"""
def __init__(self, creds: dict[str, str]):
self._conn = self._connect_to_redshift(creds)
def __repr__(self):
return str(self._conn)
def __del__(self):
self._conn.close()
@staticmethod
@wraps(connect)
def _connect_to_redshift(creds):
return connect(**creds)
@property
def available_properties(self):
return [m for m in dir(self) if not m.startswith("_")]
@wraps(read_sql)
def query(self, sql: str, **kwargs) -> DataFrame:
return read_sql(sql, con=self._conn, **kwargs)
@lru_cache()
def cached_query(self, sql: str):
return self.query(sql)
def _info(self, *args):
return self.cached_query(f"select * from {args[0]};")
@property
def recent_queries(self):
"""Info about the currently active and recently run queries against a database."""
return self._info("stv_recents")
@property
def tables(self):
"""view tables in local and external catalogs"""
return self._info("svv_tables")
@property
def tables_info(self):
"""Shows summary information for tables in the database"""
return self._info("svv_table_info")
@property
def transactions(self):
"""Transactions that currently hold locks on tables in the database"""
return self._info("svv_transactions")
@property
def database_info(self):
return self._info("pg_database_info")
@property
def database_users(self):
"""Data about the database users"""
return self._info("svl_user_info")
@property
def disk_usage(self):
return self._info("svv_diskusage")
@property
def tables_definitions(self):
return self._info("pg_table_def")
@property
def external_databases(self):
return self._info("svv_external_databases")
@property
def external_partitions(self):
return self._info("svv_external_partitions")
@property
def disk_partitions(self):
return self._info("stv_partitions")
@property
def external_schemas(self):
return self._info("svv_external_schemas")
@property
def external_tables(self):
return self._info("svv_external_tables")
@property
def external_columns(self):
return self._info("svv_external_columns")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment