Skip to content

Instantly share code, notes, and snippets.

@thiagodeschamps
Created July 19, 2023 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thiagodeschamps/d73092809b5d81f9b8ec2ea026892277 to your computer and use it in GitHub Desktop.
Save thiagodeschamps/d73092809b5d81f9b8ec2ea026892277 to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
import pandas as pd
import sqlite3
import psycopg2
import pandas.io.sql as sqlio
import mysql.connector
import pymongo
import boto3
from botocore.exceptions import NoCredentialsError
class AbstractExtractor(ABC):
@abstractmethod
def extract(self):
pass
class AbstractTransformer(ABC):
@abstractmethod
def transform(self, data):
pass
class AbstractLoader(ABC):
@abstractmethod
def load(self, data):
pass
class ExcelExtractor(AbstractExtractor):
def __init__(self, excel_file):
self.excel_file = excel_file
def extract(self):
return pd.read_excel(self.excel_file)
class PostgreSQLExtractor(AbstractExtractor):
def __init__(self, db_name, query):
self.db_name = db_name
self.query = query
def extract(self):
conn = psycopg2.connect(database=self.db_name, user='username', password='password')
return sqlio.read_sql_query(self.query, conn)
class MySQLExtractor(AbstractExtractor):
def __init__(self, db_name, query):
self.db_name = db_name
self.query = query
def extract(self):
conn = mysql.connector.connect(database=self.db_name, user='username', password='password')
cursor = conn.cursor()
cursor.execute(self.query)
result = cursor.fetchall()
return pd.DataFrame(result)
class MongoDBExtractor(AbstractExtractor):
def __init__(self, db_name, collection_name):
self.db_name = db_name
self.collection_name = collection_name
def extract(self):
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client[self.db_name]
collection = db[self.collection_name]
return pd.DataFrame(list(collection.find()))
class SimpleTransformer(AbstractTransformer):
def transform(self, data):
data['new_column'] = data['old_column'] * 2
return data
class NullReplacingTransformer(AbstractTransformer):
def __init__(self, default_value):
self.default_value = default_value
def transform(self, data):
data = data.fillna(self.default_value)
data['new_column'] = data['old_column'] * 2
return data
# Implement concrete classes for loading
class SQLLoader(AbstractLoader):
def __init__(self, db_name):
self.db_name = db_name
def load(self, data):
conn = sqlite3.connect(self.db_name)
data.to_sql('table_name', conn)
class S3Loader(AbstractLoader):
def __init__(self, bucket_name, file_name):
self.bucket_name = bucket_name
self.file_name = file_name
def load(self, data):
s3 = boto3.client('s3')
data.to_csv(self.file_name)
s3.upload_file(self.file_name, self.bucket_name, self.file_name)
class ETL:
def __init__(self, extractor, transformer, loader):
self.extractor = extractor
self.transformer = transformer
self.loader = loader
def process(self):
data = self.extractor.extract()
data = self.transformer.transform(data)
self.loader.load(data)
etl = ETL(MongoDBExtractor('database_name', 'collection_name'),
NullReplacingTransformer(0),
S3Loader('bucket_name', 'file_name.csv'))
etl.process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment