Created
May 15, 2021 16:14
-
-
Save haf/5449a94c5ab954e9f27a70d6a07afe0b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PostgresExecute(Task): | |
""" | |
Task for executing a query against a Postgres database. | |
Args: | |
- user (str): username used to authenticate, default 'postgres' | |
- password (str): password used to authenticate, default None | |
- host (str): database host address, default 'localhost' | |
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if | |
not provided | |
- db_name (str): name of Postgres database, default 'default' | |
- query (str, optional): query to execute against database | |
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple | |
- commit (bool, optional): set to False to roll back the transaction, defaults to True | |
- **kwargs (dict, optional): additional keyword arguments to pass to the | |
Task constructor | |
""" | |
def __init__( | |
self, | |
user: str = 'postgres', | |
password: str = None, | |
host: str = 'localhost', | |
port: int = 5432, | |
db_name: str = 'default', | |
query: str = None, | |
parameters: tuple = None, | |
commit: bool = True, | |
**kwargs | |
): | |
self.user = user | |
self.password = password | |
self.db_name = db_name | |
self.host = host | |
self.port = port | |
self.query = query | |
self.parameters = parameters | |
self.commit = commit | |
super().__init__(**kwargs) | |
@defaults_from_attrs( | |
"user", | |
"password", | |
"host", | |
"port", | |
"db_name", | |
"query", | |
"parameters" | |
"commit" | |
) | |
def run( | |
self, | |
user: str = None, | |
password: str = None, | |
db_name: str = None, | |
host: str = None, | |
port: int = None, | |
query: str = None, | |
parameters: tuple = None, | |
commit: bool = True, | |
): | |
""" | |
Task run method. Executes a query against Postgres database. | |
Args: | |
- user (str): username used to authenticate, default 'postgres' | |
- password (str): password used to authenticate, default None | |
- host (str): database host address, default 'localhost' | |
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if | |
not provided | |
- db_name (str): name of Postgres database, default 'default' | |
- query (str, optional): query to execute against database | |
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple | |
- commit (bool, optional): set to False to roll back the transaction, defaults to True | |
Returns: | |
- None | |
Raises: | |
- ValueError: if query parameter is None or a blank string | |
- DatabaseError: if exception occurs when executing the query | |
""" | |
if not query: | |
raise ValueError("A query string must be provided") | |
# connect to database, open cursor (which creates a txn) | |
# allow psycopg2 to pass through any exceptions raised | |
conn = None | |
# try to execute query | |
# context manager automatically rolls back failed transactions | |
try: | |
conn = pg.connect( | |
host=host, | |
port=port, | |
dbname=db_name, | |
user=user, | |
password=password, | |
) | |
with conn, conn.cursor() as cursor: | |
executed = cursor.execute(query=query, vars=parameters) | |
if commit: | |
conn.commit() | |
else: | |
conn.rollback() | |
return executed | |
# ensure connection is closed | |
finally: | |
if conn is not None: | |
conn.close() | |
class PostgresExecuteMany(Task): | |
""" | |
Task for executing many queries against a Postgres database. | |
Args: | |
- db_name (str): name of Postgres database | |
- user (str): user name used to authenticate | |
- host (str): database host address | |
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if | |
not provided | |
- query (str, optional): query to execute against database | |
is query string | |
- parameters_list (List[tuple], optional): list of values to use in query, must be specified using | |
placeholder | |
- commit (bool, optional): set to True to commit transaction, defaults to false | |
- **kwargs (dict, optional): additional keyword arguments to pass to the | |
Task constructor | |
""" | |
def __init__( | |
self, | |
user: str = 'postgres', | |
password: str = None, | |
host: str = 'localhost', | |
port: int = 5432, | |
db_name: str = 'default', | |
query: str = None, | |
parameters_list: "list[tuple]" = None, | |
commit: bool = True, | |
**kwargs | |
): | |
self.user = user | |
self.password = password | |
self.db_name = db_name | |
self.host = host | |
self.port = port | |
self.query = query | |
self.parameters_list = parameters_list | |
self.commit = commit | |
super().__init__(**kwargs) | |
@defaults_from_attrs( | |
"user", | |
"password", | |
"host", | |
"port", | |
"db_name", | |
"query", | |
"parameters_list" | |
"commit" | |
) | |
def run( | |
self, | |
user: str = None, | |
password: str = None, | |
db_name: str = None, | |
host: str = None, | |
port: int = None, | |
query: str = None, | |
parameters_list: "list[tuple]" = None, | |
commit: bool = True, | |
): | |
""" | |
Task run method. Executes many queries against Postgres database. | |
Args: | |
- query (str, optional): query to execute against database | |
- parameters_list (List[tuple], optional): list of values to use in query, must be specified using | |
placeholder | |
- commit (bool, optional): set to True to commit transaction, defaults to false | |
- password (str): password used to authenticate; should be provided from a `Secret` task | |
Returns: | |
- None | |
Raises: | |
- ValueError: if query parameter is None or a blank string | |
- DatabaseError: if exception occurs when executing the query | |
""" | |
if not query: | |
raise ValueError("A query string must be provided") | |
if not parameters_list: | |
raise ValueError("A parameter list must be provided") | |
conn = None | |
try: | |
# connect to database, open cursor | |
# allow psycopg2 to pass through any exceptions raised | |
conn = pg.connect( | |
dbname=db_name, | |
user=user, | |
password=password, | |
host=host, | |
port=port, | |
) | |
# try to execute query | |
# context manager automatically rolls back failed transactions | |
with conn, conn.cursor() as cursor: | |
executed = cursor.executemany(query=query, vars_list=parameters_list) | |
if commit: | |
conn.commit() | |
else: | |
conn.rollback() | |
return executed | |
# ensure connection is closed | |
finally: | |
if conn is not None: | |
conn.close() | |
class PostgresFetch(Task): | |
""" | |
Task for fetching results of query from Postgres database. | |
Args: | |
- db_name (str): name of Postgres database | |
- user (str): user name used to authenticate | |
- host (str): database host address | |
- port (int, optional): port used to connect to Postgres database, defaults to 5432 if | |
not provided | |
- fetch (str, optional): one of "one" "many" or "all", used to determine how many | |
results to fetch from executed query | |
- fetch_count (int, optional): if fetch = 'many', determines the number of results to | |
fetch, defaults to 10 | |
- query (str, optional): query to execute against database | |
- parameters (tuple, optional): query/command parameters; ordinal/positional inside a tuple | |
- commit (bool, optional): set to True to commit transaction, defaults to false | |
- **kwargs (dict, optional): additional keyword arguments to pass to the | |
Task constructor | |
""" | |
def __init__( | |
self, | |
user: str = 'postgres', | |
password: str = None, | |
host: str = 'localhost', | |
port: int = 5432, | |
db_name: str = 'default', | |
fetch: str = "one", | |
fetch_count: int = 10, | |
query: str = None, | |
parameters: tuple = None, | |
commit: bool = True, | |
**kwargs | |
): | |
self.user = user | |
self.password = password | |
self.host = host | |
self.port = port | |
self.db_name = db_name | |
self.fetch = fetch | |
self.fetch_count = fetch_count | |
self.query = query | |
self.parameters = parameters | |
self.commit = commit | |
super().__init__(**kwargs) | |
@defaults_from_attrs( | |
"user", | |
"password", | |
"host", | |
"port", | |
"db_name", | |
"fetch", | |
"fetch_count", | |
"query", | |
"parameters" | |
"commit" | |
) | |
def run( | |
self, | |
user: str = None, | |
password: str = None, | |
host: str = None, | |
port: int = None, | |
db_name: str = None, | |
fetch: str = "one", | |
fetch_count: int = 10, | |
query: str = None, | |
parameters: tuple = None, | |
commit: bool = True, | |
col_names: bool = False, | |
): | |
""" | |
Task run method. Executes a query against Postgres database and fetches results. | |
Args: | |
- fetch (str, optional): one of "one" "many" or "all", used to determine how many | |
results to fetch from executed query | |
- fetch_count (int, optional): if fetch = 'many', determines the number of results | |
to fetch, defaults to 10 | |
- query (str, optional): query to execute against database | |
- parameters (tuple, optional): values to use in query, must be specified using | |
placeholder is query string | |
- commit (bool, optional): set to True to commit transaction, defaults to false | |
- password (str): password used to authenticate; should be provided from a `Secret` task | |
- col_names (bool, optional): set to True to add column names to records, defaults to False | |
Returns: | |
- records (tuple or list of tuples): records from provided query | |
Raises: | |
- ValueError: if query parameter is None or a blank string | |
- DatabaseError: if exception occurs when executing the query | |
""" | |
if not query: | |
raise ValueError("A query string must be provided") | |
if fetch not in {"one", "many", "all"}: | |
raise ValueError( | |
"The 'fetch' parameter must be one of the following - ('one', 'many', 'all')" | |
) | |
# connect to database, open cursor | |
# allow psycopg2 to pass through any exceptions raised | |
conn = pg.connect( | |
dbname=db_name, | |
user=user, | |
password=password, | |
host=host, | |
port=port, | |
) | |
# try to execute query | |
# context manager automatically rolls back failed transactions | |
try: | |
with conn, conn.cursor() as cursor: | |
cursor.execute(query=query, vars=parameters) | |
# fetch results | |
if fetch == "all": | |
records = cursor.fetchall() | |
elif fetch == "many": | |
records = cursor.fetchmany(fetch_count) | |
else: | |
records = cursor.fetchone() | |
if commit: | |
conn.commit() | |
if col_names: | |
names_list = [ | |
col_description[0] for col_description in cursor.description | |
] | |
header = [tuple(col_name for col_name in names_list)] | |
records = header + records | |
return records | |
# ensure connection is closed | |
finally: | |
conn.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment