Created
July 19, 2023 13:22
-
-
Save thiagodeschamps/41eb6fa6c3d18933bb2a7447b0b8ccf1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import sqlite3 | |
import psycopg2 | |
import pandas.io.sql as sqlio | |
import mysql.connector | |
import pymongo | |
def extract_from_excel(excel_file): | |
return pd.read_excel(excel_file) | |
def extract_from_postgresql(db_name, query): | |
conn = psycopg2.connect(database=db_name, user='username', password='password') | |
return sqlio.read_sql_query(query, conn) | |
def extract_from_mysql(db_name, query): | |
conn = mysql.connector.connect(database=db_name, user='username', password='password') | |
cursor = conn.cursor() | |
cursor.execute(query) | |
result = cursor.fetchall() | |
return pd.DataFrame(result) | |
def extract_from_mongodb(db_name, collection_name): | |
client = pymongo.MongoClient("mongodb://localhost:27017/") | |
db = client[db_name] | |
collection = db[collection_name] | |
return pd.DataFrame(list(collection.find())) | |
def transform(data): | |
data['new_column'] = data['old_column'] * 2 | |
return data | |
def transform_and_replace_null(data, default_value): | |
data = data.fillna(default_value) | |
data['new_column'] = data['old_column'] * 2 | |
return data | |
def load_to_sql(data, db_name): | |
conn = sqlite3.connect(db_name) | |
data.to_sql('table_name', conn) | |
def load_to_s3(data, bucket_name, file_name): | |
data.to_csv(file_name) | |
s3.upload_file(file_name, bucket_name, file_name) | |
data = extract_from_mongodb('database_name', 'collection_name') | |
data = transform_and_replace_null(data, 0) | |
load_to_s3(data, 'bucket_name', 'file_name.csv') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment