Skip to content

Instantly share code, notes, and snippets.

@epsi95
Last active April 12, 2022 04:04
Show Gist options
  • Save epsi95/60cb986802da8759ad93eda697a0f55c to your computer and use it in GitHub Desktop.
Save epsi95/60cb986802da8759ad93eda697a0f55c to your computer and use it in GitHub Desktop.
from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce
class Base(object):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
self._spark = spark
self._host = host
self._port = port
self._username = username
self._password = password
self._database = database
def run_query(self, query, return_pandasDF=True):
raise NotImplementedError('This method must be implemented')
# TERADATA
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
# if return_pandasDF:
# return spark_df.toPandas()
# else:
# return spark_df
return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
def run_queries_and_union_all(self, queries, return_pandasDF=True):
dataframes = []
for each_query in queries:
try:
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
dataframes.append(spark_df)
except Exception as e:
# simply ignoring the query
print(f'Error while reading the query {each_query}')
concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
if return_pandasDF:
return concat_sparkDf.toPandas()
else:
return concat_sparkDf
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
# POSTGRESQL
class PostgresWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read\
.format("jdbc")\
.option("url", f"jdbc:postgresql://{self._host}:{self._port}/{self._database}")\
.option("driver", "org.postgresql.Driver")\
.option("user", self._username)\
.option("password", self._password)
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
# HIVE
from .base import Base
class Hive(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
def run_query(self, query, return_pandasDF=True):
if return_pandasDF:
return self._spark.sql(query).toPandas()
else:
return self._spark.sql(query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment