Last active
March 23, 2023 23:05
-
-
Save mdefende/22ec7c190bb6338c0612a929d8a6707f to your computer and use it in GitHub Desktop.
Easy functions to interface with a postgres database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO, BytesIO | |
import pickle | |
import pandas as pd | |
import psycopg2 | |
# Postgres connector functions | |
def connect(params_dic): | |
""" Connect to the PostgreSQL database server | |
params_dic should be a parameter dictionary with the following fields: | |
{ | |
'username': '<username>', | |
'password': '<password>', | |
'host': '<host_ip>' | |
'database': '<database_name>' | |
'port': '<port_num>' | |
'options': '<extra_psql_options'> | |
} | |
'options' can include things like adding schema to the search path. | |
""" | |
conn = None | |
try: | |
# connect to the PostgreSQL server | |
print('Connecting to the PostgreSQL database...') | |
conn = psycopg2.connect(**params_dic) | |
except (Exception, psycopg2.DatabaseError) as error: | |
print(error) | |
print("Connection successful") | |
return conn | |
def copy_from_stringio(conn, df, table): | |
""" | |
Here we are going save the dataframe in memory | |
and use copy_from() to copy it to the table | |
""" | |
# save dataframe to an in memory buffer | |
buffer = StringIO() | |
df.to_csv(buffer, sep = '\t', index = False, header=False) | |
buffer.seek(0) | |
cursor = conn.cursor() | |
try: | |
cursor.copy_from(buffer, table, sep="\t") | |
conn.commit() | |
except (Exception, psycopg2.DatabaseError) as error: | |
err_string = "Error: %s" % error | |
conn.rollback() | |
cursor.close() | |
return err_string | |
print("copy_from_stringio() done") | |
cursor.close() | |
""" | |
All three of these query commands can be combined into a single function with the subquery passed as an argument. Need to refactor later | |
""" | |
def generic_query(conn, query): | |
size = 2**30 | |
buf = BytesIO() | |
cursor = conn.cursor() | |
copy_query = 'COPY ( ' + query + ' ) TO STDOUT WITH CSV HEADER;' | |
cursor.copy_expert(copy_query, buf, size) | |
buf.seek(0) | |
df = pd.read_csv(buf,engine='c', low_memory=False) | |
buf.close() | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment