Skip to content

Instantly share code, notes, and snippets.

@neylsoncrepalde
Created November 23, 2022 22:45
Show Gist options
  • Save neylsoncrepalde/3427a2421dc00dd4dcc65267d7b3c56e to your computer and use it in GitHub Desktop.
Save neylsoncrepalde/3427a2421dc00dd4dcc65267d7b3c56e to your computer and use it in GitHub Desktop.
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
import requests
import json
import boto3
import pymongo
from sqlalchemy import create_engine
from airflow.models import Variable
# Pegando as variaveis de ambiente cadastradas no AIRFLOW
aws_access_key_id = Variable.get('aws_access_key_id')
aws_secret_access_key = Variable.get('aws_secret_access_key')
mongo_password = Variable.get('mongo_password')
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
#Definir default_args
default_args = {
'owner': 'Neylson Crepalde',
'depends_on_past': False,
'start_date': datetime(2022, 9, 21),
}
# Definir a DAG e suas tasks
@dag(default_args=default_args, schedule_interval='@once', catchup=False, description='Desafio final XPe btc EDD', tags=['mongo', 'python', 'EDD', 'Postgres'])
def igti_desafio_final_edd():
"""
Um flow para obter dados do IBGE de uma base MongoDB, da API de microrregiões do IBGE,
depositar no datalake no S3 e no DW num PostgreSQL local
"""
@task
def extrai_mongo():
data_path = '/tmp/pnadc20203.csv'
client = pymongo.MongoClient(f'mongodb+srv://estudante_igti:{mongo_password}@unicluster.ixhvw.mongodb.net/ibge?retryWrites=true&w=majority')
db = client.ibge
pnad_collec = db.pnadc20203
df = pd.DataFrame(list(pnad_collec.find()))
df.to_csv(data_path, index=False, encoding='utf-8', sep=';')
return data_path
@task
def data_check(file_name):
df = pd.read_csv(file_name, sep=';')
print(df)
@task
def extrai_api():
data_path = '/tmp/dimensao_mesorregioes_mg.csv'
url = 'https://servicodados.ibge.gov.br/api/v1/localidades/estados/MG/mesorregioes'
response = requests.get(url)
response_json = json.loads(response.text)
df = pd.DataFrame(response_json)[['id', 'nome']]
df.to_csv(data_path, index=False, encoding='utf-8', sep=';')
return data_path
@task
def upload_to_s3(file_name):
print(f"Got filename: {file_name}")
print(f"Got object_name: {file_name[5:]}")
s3_client.upload_file(file_name, 'datalake-ney-xp-edc', f"raw-data/desafio_final_edd/{file_name[5:]}")
@task
def write_to_postgres(csv_file_path):
engine = create_engine('postgresql://postgres:postgres@postgres:5432/postgres')
df = pd.read_csv(csv_file_path, sep=';')
if csv_file_path == "/tmp/pnadc20203.csv":
df = df.loc[(df.idade >= 20) & (df.idade <= 40) & (df.sexo == 'Mulher')]
df.to_sql(csv_file_path[5:-4], engine, if_exists='replace', index=False, method='multi', chunksize=1000)
# Orquestração
mongo = extrai_mongo()
api = extrai_api()
checagem = data_check(mongo)
upmongo = upload_to_s3(mongo)
upapi = upload_to_s3(api)
wrmongo = write_to_postgres(mongo)
wrapi = write_to_postgres(api)
mongo >> checagem >> [upmongo, wrmongo]
execucao = igti_desafio_final_edd()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment