Skip to content

Instantly share code, notes, and snippets.

@tomasoak
Last active August 16, 2023 01:14
Show Gist options
  • Save tomasoak/81949ef962e8b61786be2355d81153f5 to your computer and use it in GitHub Desktop.
Save tomasoak/81949ef962e8b61786be2355d81153f5 to your computer and use it in GitHub Desktop.
edb011_activity_03
"""
Main ETL script
Formato table final
Nome do Banco
CNPJ
Classificação do Banco
Quantidade de Clientes do Bancos
Índice de reclamações
Quantidade de reclamações
Índice de satisfação dos funcionários dos bancos
Índice de satisfação com salários dos funcionários dos bancos.
"""
import io
import polars as pl
import boto3
s3 = boto3.client("s3")
bucket_input_name = "edb011-input"
bancos_path_filename = "Bancos/EnquadramentoInicia_v2.tsv"
empregados_path_filename = "Empregados/glassdoor_consolidado_join_match_v2.csv"
class Extract:
def __init__(self, file_name=None):
self.bucket = bucket_input_name
self.file_name = file_name
def read_file(self, separator=None):
obj = s3.get_object(Bucket=self.bucket, Key=self.file_name)
df = pl.read_csv(obj["Body"], separator=separator)
return df
def _transform():
"""Minimal transformation to be able to extract data"""
COLUMN_TYPES = {"Quantidade total de clientes \x96 CCS e SCR": pl.Int64(),
"Quantidade de clientes \x96 CCS": pl.Int64(),
"Quantidade de clientes \x96 SCR": pl.Int64()}
COLUMN_RENAME = {"Quantidade total de clientes \x96 CCS e SCR": "Quantidade total de clientes CCS e SCR",
"Quantidade de clientes \x96 CCS": "Quantidade de clientes CCS",
"Quantidade de clientes \x96 SCR": "Quantidade de clientes SCR"}
return COLUMN_TYPES, COLUMN_RENAME
def read_multi_files(self):
COLUMN_TYPES, COLUMN_RENAME = Extract._transform()
bucket_input_obj = boto3.resource('s3').Bucket(bucket_input_name)
prefix_objs = bucket_input_obj.objects.filter(Prefix="Reclamacoes/")
df = pl.concat([(pl.read_csv(io.BytesIO(obj.get()['Body'].read()),
encoding="latin1",
separator=";",
dtypes=COLUMN_TYPES)
.rename(COLUMN_RENAME))
for obj in prefix_objs])
return df
def transform():
"""Join needed tables, transform and export it"""
BANK_NAME = {
"BANCO TOKYO": "BANCO TOKYO-MITSUBISHI BM S.A.",
"MERCEDES": "MERCEDES-BENZ"
}
bancos = Extract(bancos_path_filename).read_file("\t")
bancos = (
bancos.with_columns(pl.col("Nome").str.split_exact("-", 0)).unnest("Nome").rename({"field_0": "Nome"})
.with_columns(pl.col("Nome").str.strip(" "))
.with_columns(pl.col("Nome").str.replace("BANCO TOKYO", "BANCO TOKYO-MITSUBISHI BM S.A."))
.with_columns(pl.col("Nome").str.replace("MERCEDES", "MERCEDES-BENZ"))
)
empregados = Extract(empregados_path_filename).read_file("|")
bancos_empregados = bancos.join(empregados, on="Nome", how="left")
bancos_empregados = bancos_empregados.with_columns(pl.col("CNPJ").cast(pl.Utf8))
bancos_empregados = bancos_empregados.groupby(["CNPJ", "Nome"]).agg([pl.mean("Remuneração e benefícios")
, pl.mean("Perspectiva positiva da empresa(%)")])
reclamacoes = Extract().read_multi_files()
reclamacoes = reclamacoes.groupby(["CNPJ IF", "Segmento"]).agg([pl.sum("Quantidade total de reclamações")
, pl.sum("Quantidade total de clientes CCS e SCR")])
df = bancos_empregados.join(reclamacoes, left_on="CNPJ", right_on="CNPJ IF")
df = df.rename({"Nome": "nome", "CNPJ": "cnpj",
"Segmento": "classificacao_banco",
"Quantidade total de clientes CCS e SCR": "quantidade_total_clientes",
"Quantidade total de reclamações": "quantidade_total_reclamacoes",
"Remuneração e benefícios": "idx_satisfacao_salario"})
df = df.with_columns((pl.col("quantidade_total_reclamacoes") / pl.col("quantidade_total_clientes")).alias("idx_reclamacoes"))
df.select(pl.col(["nome", "cnpj", "classificacao_banco", "quantidade_total_clientes", "idx_reclamacoes",
"quantidade_total_reclamacoes", "idx_satisfacao_salario"]))
return df
def export():
"""Exports data as csv to S3 Bucket"""
df = transform()
df.write_csv("final_table.csv", separator=",")
if __name__ == "__main__":
export()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment