Skip to content

Instantly share code, notes, and snippets.

@haf
Created May 15, 2021 16:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haf/5449a94c5ab954e9f27a70d6a07afe0b to your computer and use it in GitHub Desktop.
Save haf/5449a94c5ab954e9f27a70d6a07afe0b to your computer and use it in GitHub Desktop.
class PostgresExecute(Task):
"""
Task for executing a query against a Postgres database.
Args:
- user (str): username used to authenticate, default 'postgres'
- password (str): password used to authenticate, default None
- host (str): database host address, default 'localhost'
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if
not provided
- db_name (str): name of Postgres database, default 'default'
- query (str, optional): query to execute against database
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple
- commit (bool, optional): set to False to roll back the transaction, defaults to True
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(
self,
user: str = 'postgres',
password: str = None,
host: str = 'localhost',
port: int = 5432,
db_name: str = 'default',
query: str = None,
parameters: tuple = None,
commit: bool = True,
**kwargs
):
self.user = user
self.password = password
self.db_name = db_name
self.host = host
self.port = port
self.query = query
self.parameters = parameters
self.commit = commit
super().__init__(**kwargs)
@defaults_from_attrs(
"user",
"password",
"host",
"port",
"db_name",
"query",
"parameters"
"commit"
)
def run(
self,
user: str = None,
password: str = None,
db_name: str = None,
host: str = None,
port: int = None,
query: str = None,
parameters: tuple = None,
commit: bool = True,
):
"""
Task run method. Executes a query against Postgres database.
Args:
- user (str): username used to authenticate, default 'postgres'
- password (str): password used to authenticate, default None
- host (str): database host address, default 'localhost'
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if
not provided
- db_name (str): name of Postgres database, default 'default'
- query (str, optional): query to execute against database
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple
- commit (bool, optional): set to False to roll back the transaction, defaults to True
Returns:
- None
Raises:
- ValueError: if query parameter is None or a blank string
- DatabaseError: if exception occurs when executing the query
"""
if not query:
raise ValueError("A query string must be provided")
# connect to database, open cursor (which creates a txn)
# allow psycopg2 to pass through any exceptions raised
conn = None
# try to execute query
# context manager automatically rolls back failed transactions
try:
conn = pg.connect(
host=host,
port=port,
dbname=db_name,
user=user,
password=password,
)
with conn, conn.cursor() as cursor:
executed = cursor.execute(query=query, vars=parameters)
if commit:
conn.commit()
else:
conn.rollback()
return executed
# ensure connection is closed
finally:
if conn is not None:
conn.close()
class PostgresExecuteMany(Task):
"""
Task for executing many queries against a Postgres database.
Args:
- db_name (str): name of Postgres database
- user (str): user name used to authenticate
- host (str): database host address
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if
not provided
- query (str, optional): query to execute against database
is query string
- parameters_list (List[tuple], optional): list of values to use in query, must be specified using
placeholder
- commit (bool, optional): set to True to commit transaction, defaults to false
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(
self,
user: str = 'postgres',
password: str = None,
host: str = 'localhost',
port: int = 5432,
db_name: str = 'default',
query: str = None,
parameters_list: "list[tuple]" = None,
commit: bool = True,
**kwargs
):
self.user = user
self.password = password
self.db_name = db_name
self.host = host
self.port = port
self.query = query
self.parameters_list = parameters_list
self.commit = commit
super().__init__(**kwargs)
@defaults_from_attrs(
"user",
"password",
"host",
"port",
"db_name",
"query",
"parameters_list"
"commit"
)
def run(
self,
user: str = None,
password: str = None,
db_name: str = None,
host: str = None,
port: int = None,
query: str = None,
parameters_list: "list[tuple]" = None,
commit: bool = True,
):
"""
Task run method. Executes many queries against Postgres database.
Args:
- query (str, optional): query to execute against database
- parameters_list (List[tuple], optional): list of values to use in query, must be specified using
placeholder
- commit (bool, optional): set to True to commit transaction, defaults to false
- password (str): password used to authenticate; should be provided from a `Secret` task
Returns:
- None
Raises:
- ValueError: if query parameter is None or a blank string
- DatabaseError: if exception occurs when executing the query
"""
if not query:
raise ValueError("A query string must be provided")
if not parameters_list:
raise ValueError("A parameter list must be provided")
conn = None
try:
# connect to database, open cursor
# allow psycopg2 to pass through any exceptions raised
conn = pg.connect(
dbname=db_name,
user=user,
password=password,
host=host,
port=port,
)
# try to execute query
# context manager automatically rolls back failed transactions
with conn, conn.cursor() as cursor:
executed = cursor.executemany(query=query, vars_list=parameters_list)
if commit:
conn.commit()
else:
conn.rollback()
return executed
# ensure connection is closed
finally:
if conn is not None:
conn.close()
class PostgresFetch(Task):
"""
Task for fetching results of query from Postgres database.
Args:
- db_name (str): name of Postgres database
- user (str): user name used to authenticate
- host (str): database host address
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if
not provided
- fetch (str, optional): one of "one" "many" or "all", used to determine how many
results to fetch from executed query
- fetch_count (int, optional): if fetch = 'many', determines the number of results to
fetch, defaults to 10
- query (str, optional): query to execute against database
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple
- commit (bool, optional): set to True to commit transaction, defaults to false
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(
self,
user: str = 'postgres',
password: str = None,
host: str = 'localhost',
port: int = 5432,
db_name: str = 'default',
fetch: str = "one",
fetch_count: int = 10,
query: str = None,
parameters: tuple = None,
commit: bool = True,
**kwargs
):
self.user = user
self.password = password
self.host = host
self.port = port
self.db_name = db_name
self.fetch = fetch
self.fetch_count = fetch_count
self.query = query
self.parameters = parameters
self.commit = commit
super().__init__(**kwargs)
@defaults_from_attrs(
"user",
"password",
"host",
"port",
"db_name",
"fetch",
"fetch_count",
"query",
"parameters"
"commit"
)
def run(
self,
user: str = None,
password: str = None,
host: str = None,
port: int = None,
db_name: str = None,
fetch: str = "one",
fetch_count: int = 10,
query: str = None,
parameters: tuple = None,
commit: bool = True,
col_names: bool = False,
):
"""
Task run method. Executes a query against Postgres database and fetches results.
Args:
- fetch (str, optional): one of "one" "many" or "all", used to determine how many
results to fetch from executed query
- fetch_count (int, optional): if fetch = 'many', determines the number of results
to fetch, defaults to 10
- query (str, optional): query to execute against database
- parameters (tuple, optional): values to use in query, must be specified using
placeholder is query string
- commit (bool, optional): set to True to commit transaction, defaults to false
- password (str): password used to authenticate; should be provided from a `Secret` task
- col_names (bool, optional): set to True to add column names to records, defaults to False
Returns:
- records (tuple or list of tuples): records from provided query
Raises:
- ValueError: if query parameter is None or a blank string
- DatabaseError: if exception occurs when executing the query
"""
if not query:
raise ValueError("A query string must be provided")
if fetch not in {"one", "many", "all"}:
raise ValueError(
"The 'fetch' parameter must be one of the following - ('one', 'many', 'all')"
)
# connect to database, open cursor
# allow psycopg2 to pass through any exceptions raised
conn = pg.connect(
dbname=db_name,
user=user,
password=password,
host=host,
port=port,
)
# try to execute query
# context manager automatically rolls back failed transactions
try:
with conn, conn.cursor() as cursor:
cursor.execute(query=query, vars=parameters)
# fetch results
if fetch == "all":
records = cursor.fetchall()
elif fetch == "many":
records = cursor.fetchmany(fetch_count)
else:
records = cursor.fetchone()
if commit:
conn.commit()
if col_names:
names_list = [
col_description[0] for col_description in cursor.description
]
header = [tuple(col_name for col_name in names_list)]
records = header + records
return records
# ensure connection is closed
finally:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment