Skip to content

Instantly share code, notes, and snippets.

@diogoaurelio
Created October 24, 2018 07:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diogoaurelio/c6330f0e4aab98513f523053bad0e846 to your computer and use it in GitHub Desktop.
Save diogoaurelio/c6330f0e4aab98513f523053bad0e846 to your computer and use it in GitHub Desktop.
Sample utility functions for using pg8000 to connect with Postgres
"""
Utils to interact with DB using pg8000 library
Note: assumes py3.5+
[Example usage]
# OPTIONALLY retrieve DB password from AWS SSM
import boto3
ssm = boto3.client('ssm')
db_password = ssm.get_parameter(Name=parameter_name, WithDecryption=True)
# get client connection and cursor
client, cur = get_pg_client(user="master", host="127.0.0.1",
port=5432, db="postgres", pwd=db_password)
# Create a dummy table
create_table_statement= \"""
CREATE TABLE IF NOT EXISTS public.test_table(
job_id SERIAL,
kpi VARCHAR(25),
from_id BIGINT,
to_id BIGINT,
status VARCHAR(25),
details TEXT,
retries INT DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
finished_at TIMESTAMP WITH TIME zone,
PRIMARY KEY (kpi, from_id, status, retries)
)
TABLESPACE pg_default;
\"""
query(cursor=cur, query=create_table_statement)
insert_dummy_data_statements = [
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_login', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z')",
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('purchase_event', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');",
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_logout', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');"
]
for qry in insert_dummy_data_statements:
query(cursor=cur, query=qry)
# query 2 rows
my_query = "SELECT * FROM test_table LIMIT 2"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchall()
for row in results:
print(row)
# get only 1 row
my_query = "SELECT count(*) FROM test_table"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchone()
print(results[0])
# get X amount of rows
number_of_rows_to_fetch = 3
my_query = "SELECT * FROM test_table"
query(cursor=cur, query=my_query)
results: List[Tuple[Any]] = cur.fetchmany(number_of_rows_to_fetch)
for row in results:
print(row)
[more info]
For more information, please RTFM: https://github.com/mfenniak/pg8000
"""
import pg8000
def get_pg_client(user: str, host: str, port: int, db: str, pwd: str, auto_commit=True, ssl=False):
""" Utility function to get connection and cursor for postgres interaction """
client = pg8000.connect(user=user, host=host, port=port,
database=db, password=pwd, ssl=ssl)
client.autocommit = auto_commit
cur = client.cursor()
return client, cur
def close_db_conn(conn, cur) -> None:
""" Utility function to close an existing DB connection to avoid idle sessions """
try:
cur.close()
conn.close()
except pg8000.core.InterfaceError as e:
pass
def query(cursor, query: str, logger=None) -> None:
""" DRY """
if logger:
logger.info("Executing insert query: {}".format(query))
cursor.execute(query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment