Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample utility functions for using pg8000 to connect with Postgres
"""
Utils to interact with DB using pg8000 library
Note: assumes py3.5+
[Example usage]
# OPTIONALLY retrieve DB password from AWS SSM
import boto3
ssm = boto3.client('ssm')
db_password = ssm.get_parameter(Name=parameter_name, WithDecryption=True)
# get client connection and cursor
client, cur = get_pg_client(user="master", host="127.0.0.1",
port=5432, db="postgres", pwd=db_password)
# Create a dummy table
create_table_statement= \"""
CREATE TABLE IF NOT EXISTS public.test_table(
job_id SERIAL,
kpi VARCHAR(25),
from_id BIGINT,
to_id BIGINT,
status VARCHAR(25),
details TEXT,
retries INT DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
finished_at TIMESTAMP WITH TIME zone,
PRIMARY KEY (kpi, from_id, status, retries)
)
TABLESPACE pg_default;
\"""
query(cursor=cur, query=create_table_statement)
insert_dummy_data_statements = [
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_login', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z')",
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('purchase_event', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');",
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_logout', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');"
]
for qry in insert_dummy_data_statements:
query(cursor=cur, query=qry)
# query 2 rows
my_query = "SELECT * FROM test_table LIMIT 2"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchall()
for row in results:
print(row)
# get only 1 row
my_query = "SELECT count(*) FROM test_table"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchone()
print(results[0])
# get X amount of rows
number_of_rows_to_fetch = 3
my_query = "SELECT * FROM test_table"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchmany(number_of_rows_to_fetch)
for row in results:
print(row)
[more info]
For more information, please RTFM: https://github.com/mfenniak/pg8000
"""
import pg8000
def get_pg_client(user: str, host: str, port: int, db: str, pwd: str, auto_commit=True, ssl=False):
""" Utility function to get connection and cursor for postgres interaction """
client = pg8000.connect(user=user, host=host, port=port,
database=db, password=pwd, ssl=ssl)
client.autocommit = auto_commit
cur = client.cursor()
return client, cur
def close_db_conn(conn, cur) -> None:
""" Utility function to close an existing DB connection to avoid idle sessions """
try:
cur.close()
conn.close()
except pg8000.core.InterfaceError as e:
pass
def query(cursor, query: str, logger=None) -> None:
""" DRY """
if logger:
logger.info("Executing insert query: {}".format(query))
cursor.execute(query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment