Created
October 31, 2020 01:04
-
-
Save iamaziz/5a86da581564de977c31b2b5f64f6258 to your computer and use it in GitHub Desktop.
Redshift Database Hook (adapter)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from functools import lru_cache, wraps | |
from psycopg2 import connect | |
from pandas import read_sql, DataFrame | |
class RedshiftDB: | |
"""Redshift Database Hook | |
USAGE: | |
creds = { | |
'host': '', | |
'user': '', | |
'password': '', | |
'port': 5439, | |
'dbname': '' | |
} | |
db = RedshiftDB(creds) | |
db.database_users | |
db.disk_usage | |
db.database_info | |
.. | |
db.query('select * from <MYTABLE>;') | |
.. | |
# for all available propertes, see | |
db.available_properties | |
More info, see: https://docs.aws.amazon.com/redshift/latest/dg/cm_chap_system-tables.html | |
""" | |
def __init__(self, creds: dict[str, str]): | |
self._conn = self._connect_to_redshift(creds) | |
def __repr__(self): | |
return str(self._conn) | |
def __del__(self): | |
self._conn.close() | |
@staticmethod | |
@wraps(connect) | |
def _connect_to_redshift(creds): | |
return connect(**creds) | |
@property | |
def available_properties(self): | |
return [m for m in dir(self) if not m.startswith("_")] | |
@wraps(read_sql) | |
def query(self, sql: str, **kwargs) -> DataFrame: | |
return read_sql(sql, con=self._conn, **kwargs) | |
@lru_cache() | |
def cached_query(self, sql: str): | |
return self.query(sql) | |
def _info(self, *args): | |
return self.cached_query(f"select * from {args[0]};") | |
@property | |
def recent_queries(self): | |
"""Info about the currently active and recently run queries against a database.""" | |
return self._info("stv_recents") | |
@property | |
def tables(self): | |
"""view tables in local and external catalogs""" | |
return self._info("svv_tables") | |
@property | |
def tables_info(self): | |
"""Shows summary information for tables in the database""" | |
return self._info("svv_table_info") | |
@property | |
def transactions(self): | |
"""Transactions that currently hold locks on tables in the database""" | |
return self._info("svv_transactions") | |
@property | |
def database_info(self): | |
return self._info("pg_database_info") | |
@property | |
def database_users(self): | |
"""Data about the database users""" | |
return self._info("svl_user_info") | |
@property | |
def disk_usage(self): | |
return self._info("svv_diskusage") | |
@property | |
def tables_definitions(self): | |
return self._info("pg_table_def") | |
@property | |
def external_databases(self): | |
return self._info("svv_external_databases") | |
@property | |
def external_partitions(self): | |
return self._info("svv_external_partitions") | |
@property | |
def disk_partitions(self): | |
return self._info("stv_partitions") | |
@property | |
def external_schemas(self): | |
return self._info("svv_external_schemas") | |
@property | |
def external_tables(self): | |
return self._info("svv_external_tables") | |
@property | |
def external_columns(self): | |
return self._info("svv_external_columns") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment