Created
October 24, 2018 07:50
-
-
Save diogoaurelio/c6330f0e4aab98513f523053bad0e846 to your computer and use it in GitHub Desktop.
Sample utility functions for using pg8000 to connect with Postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utils to interact with DB using pg8000 library | |
Note: assumes py3.5+ | |
[Example usage] | |
# OPTIONALLY retrieve DB password from AWS SSM | |
import boto3 | |
ssm = boto3.client('ssm') | |
db_password = ssm.get_parameter(Name=parameter_name, WithDecryption=True) | |
# get client connection and cursor | |
client, cur = get_pg_client(user="master", host="127.0.0.1", | |
port=5432, db="postgres", pwd=db_password) | |
# Create a dummy table | |
create_table_statement= \""" | |
CREATE TABLE IF NOT EXISTS public.test_table( | |
job_id SERIAL, | |
kpi VARCHAR(25), | |
from_id BIGINT, | |
to_id BIGINT, | |
status VARCHAR(25), | |
details TEXT, | |
retries INT DEFAULT 0, | |
created_at TIMESTAMP WITH TIME ZONE NOT NULL, | |
finished_at TIMESTAMP WITH TIME zone, | |
PRIMARY KEY (kpi, from_id, status, retries) | |
) | |
TABLESPACE pg_default; | |
\""" | |
query(cursor=cur, query=create_table_statement) | |
insert_dummy_data_statements = [ | |
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_login', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z')", | |
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('purchase_event', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');", | |
"insert into public.test_table(kpi, from_id, to_id, status, created_at, finished_at) values ('user_logout', 0, 5, 'FINISHED', '2018-08-14T12:46:32Z', '2018-08-14T12:46:32Z');" | |
] | |
for qry in insert_dummy_data_statements: | |
query(cursor=cur, query=qry) | |
# query 2 rows | |
my_query = "SELECT * FROM test_table LIMIT 2" | |
query(cursor=cur, query=my_query) | |
results: List[Tuple[Any]] = cur.fetchall() | |
for row in results: | |
print(row) | |
# get only 1 row | |
my_query = "SELECT count(*) FROM test_table" | |
query(cursor=cur, query=my_query) | |
results: List[Tuple[Any]] = cur.fetchone() | |
print(results[0]) | |
# get X amount of rows | |
number_of_rows_to_fetch = 3 | |
my_query = "SELECT * FROM test_table" | |
query(cursor=cur, query=my_query) | |
results: List[Tuple[Any]] = cur.fetchmany(number_of_rows_to_fetch) | |
for row in results: | |
print(row) | |
[more info] | |
For more information, please RTFM: https://github.com/mfenniak/pg8000 | |
""" | |
import pg8000 | |
def get_pg_client(user: str, host: str, port: int, db: str, pwd: str, auto_commit=True, ssl=False): | |
""" Utility function to get connection and cursor for postgres interaction """ | |
client = pg8000.connect(user=user, host=host, port=port, | |
database=db, password=pwd, ssl=ssl) | |
client.autocommit = auto_commit | |
cur = client.cursor() | |
return client, cur | |
def close_db_conn(conn, cur) -> None: | |
""" Utility function to close an existing DB connection to avoid idle sessions """ | |
try: | |
cur.close() | |
conn.close() | |
except pg8000.core.InterfaceError as e: | |
pass | |
def query(cursor, query: str, logger=None) -> None: | |
""" DRY """ | |
if logger: | |
logger.info("Executing insert query: {}".format(query)) | |
cursor.execute(query) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment