Skip to content

Instantly share code, notes, and snippets.

@brenolf
Last active June 20, 2017 03:28
Show Gist options
  • Save brenolf/f689effeab855c6a231d7073e09f9a29 to your computer and use it in GitHub Desktop.
Save brenolf/f689effeab855c6a231d7073e09f9a29 to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from itertools import combinations
from itertools import product
from sqlalchemy.types import *
# Conexao para o BD
engine = create_engine('postgres://breno@127.0.0.1:5432/dw')
df = pd.read_csv('./staging.csv')
df = df.rename(columns = {
'HUMAN_FATAL': 'vitimas_fatais',
'HUMAN_INJ': 'vitimas_sobreviventes',
'DMG_THOUS': 'custo_reparos',
'numero_de_tornados': 'numero_de_tornados'
})
CONSTANT = 5000
"""
Cria hierarquias dado um dataframe. Copia o dataframe, coloca-o abaixo do
carregado, deixando os dados da 1a coluna como nulos e mudando sua granularidade
para o nome da 2a coluna. Reajusta o indice no fim. Continua o processo ate
a ultima coluna.
"""
def criar_hierarquia(df):
df = df.copy().drop_duplicates().reset_index(drop=True)
d = df.copy()
d.loc[:, 'id'] = df.index + 1
LEN = len(df.columns) - 1
columns = df.columns.tolist()
for i in xrange(1, LEN):
c = columns[:i]
aux = df.copy()
aux[c] = None
aux = aux.drop_duplicates()
aux.loc[:, 'granularidade'] = columns[i]
aux.loc[:, 'id'] = aux.index + 1 + CONSTANT * i
d = pd.concat([d, aux])
return d
"""
Carrega os dados de data, mudando os nomes das colunas
"""
def carregar_data():
d = df[['PERIODO', 'MES', 'ESTACAO', 'ANO']].copy()
d = d.rename(columns={
'PERIODO': 'periodo',
'MES': 'mes',
'ESTACAO': 'estacao',
'ANO': 'ano'
})
d['granularidade'] = 'periodo'
return criar_hierarquia(d)
"""
Carrega os dados de local, mudando os nomes das colunas
"""
def carregar_local():
C1 = df[['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM']].copy()
C2 = df[['CIDADE_DESTINO', 'PROVINCIA_DESTINO']].copy().rename(columns={
'CIDADE_DESTINO': 'CIDADE_ORIGEM',
'PROVINCIA_DESTINO': 'PROVINCIA_ORIGEM'
})
d = pd.concat([C1, C2])
d = d.rename(columns={
'CIDADE_ORIGEM': 'cidade',
'PROVINCIA_ORIGEM': 'provincia'
})
d['granularidade'] = 'cidade'
return criar_hierarquia(d)
"""
Carrega os dados de temperatura, mudando os nomes das colunas
"""
def carregar_temperatura():
d = df[['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE']].copy().drop_duplicates()
d = d.rename(columns={
'CELCIUS_CLASSE': 'celcius',
'FAHRENHEIT_CLASSE': 'fahrenheit',
'KELVIN_CLASSE': 'kelvin'
}).reset_index(drop=True)
d['id'] = (d.index + 1) * CONSTANT
return d
"""
Carrega os dados de escala fujita
"""
def carregar_fujita():
d = df[['FUJITA']].copy().drop_duplicates()
d = d.rename(columns={
'FUJITA': 'id'
}).reset_index(drop=True)
return d
"""
Carrega todas as combinacoes possives de dimensoes
"""
def carregar_malha():
dimensoes = ['Temperatura', 'LocalOrigem', 'LocalDestino', 'Fujita', 'Data']
X = []
for i in xrange(0, 6):
C = combinations(dimensoes, i)
X.append(list(C))
return X
"""
Chama todas as funcoes de dimensoes e as registra no banco
"""
def criar_dimensoes():
dims = ['data', 'local', 'temperatura']
sql = """
ALTER TABLE
{}
ADD PRIMARY KEY
(id)
"""
for d in dims:
data = globals()['carregar_' + d]()
data.to_sql(d, engine, index=False, dtype={'id': BigInteger})
engine.execute(sql.format(d, d, d))
"""
Retorna um DataFrame com colunas de ids corretas para as dimensoes a que se
referem com base no atributo `dimensoes`
"""
def adicionar_indices(original, dimensoes):
df = original.copy()
campos = {
'Temperatura': ['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE'],
'LocalOrigem': ['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM'],
'LocalDestino': ['CIDADE_DESTINO', 'PROVINCIA_DESTINO'],
'Fujita': ['FUJITA'],
'Data': ['PERIODO', 'MES', 'ESTACAO', 'ANO']
}
joins = {
'Temperatura': ['celcius', 'fahrenheit', 'kelvin'],
'LocalOrigem': ['cidade', 'provincia'],
'LocalDestino': ['cidade', 'provincia'],
'Fujita': ['id'],
'Data': ['periodo', 'mes', 'estacao', 'ano']
}
fks = {
'Temperatura': 'temperatura_media',
'LocalOrigem': 'local_origem',
'LocalDestino': 'local_destino',
'Fujita': 'escala_fujita',
'Data': 'data_id'
}
tables = {
'Temperatura': carregar_temperatura(),
'LocalOrigem': carregar_local(),
'LocalDestino': carregar_local(),
'Fujita': carregar_fujita(),
'Data': carregar_data()
}
for d in dimensoes:
for c in campos[d]:
if not (c in df.columns):
df[c] = None
df = df.merge(tables[d], left_on=campos[d], right_on=joins[d])
df = df.rename(columns = {
'id': fks[d]
})
return df
"""
Cria chaves estrangeiras no banco
"""
def criar_chaves(tabela, dimensoes):
if len(dimensoes) == 0:
return
sql_FK = """
ALTER TABLE
{}
ADD CONSTRAINT
{}
FOREIGN KEY
({})
REFERENCES
{} (id)
"""
sql_PK = """
ALTER TABLE
{}
ADD PRIMARY KEY
({})
"""
dims = {
'Temperatura': 'temperatura',
'LocalOrigem': 'local',
'LocalDestino': 'local',
'Data': 'data'
}
fks = {
'Temperatura': 'temperatura_media',
'LocalOrigem': 'local_origem',
'LocalDestino': 'local_destino',
'Data': 'data_id',
'Fujita': 'escala_fujita'
}
pk = ', '.join(map(lambda x: fks[x], dimensoes))
engine.execute(sql_PK.format(tabela, pk))
dimensoes = set(dimensoes) - set(['Fujita'])
for d in dimensoes:
constraint = 'FK_' + tabela + '__' + d.lower()
engine.execute(sql_FK.format(tabela, constraint, fks[d], dims[d]))
"""
Cria agregados
"""
def criar_agregados():
malha = carregar_malha()
fks = {
'Temperatura': 'temperatura_media',
'LocalOrigem': 'local_origem',
'LocalDestino': 'local_destino',
'Fujita': 'escala_fujita',
'Data': 'data_id'
}
hierarquias = {
'Temperatura': [['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE']],
'LocalOrigem': [['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM'], ['PROVINCIA_ORIGEM']],
'LocalDestino': [['CIDADE_DESTINO', 'PROVINCIA_DESTINO'], ['PROVINCIA_DESTINO']],
'Fujita': [['FUJITA']],
'Data': [
['PERIODO', 'MES', 'ESTACAO', 'ANO'],
['MES', 'ESTACAO', 'ANO'],
['ESTACAO', 'ANO'],
['ANO']
]
}
metricas = ['vitimas_fatais', 'vitimas_sobreviventes', 'custo_reparos', 'numero_de_tornados']
for m in malha:
for agregado in m:
nome = '_'.join(agregado)
nome = 'Fato' if nome == 'Temperatura_LocalOrigem_LocalDestino_Fujita_Data' else nome
nome = 'Tudo' if nome == '' else nome
nome = ('a_' + nome).lower()
print 'agregado', nome, '\n\n'
campos = [hierarquias[dimensao] for dimensao in agregado]
campos = list(product(*campos))
campos = [reduce(lambda x, y: x + y, c, []) for c in campos]
coluna_agregado = map(lambda x: fks[x], agregado)
coluna_agregado = list(set(coluna_agregado))
tabela = pd.DataFrame(columns=coluna_agregado + metricas)
for c in campos:
if len(c) == 0:
x = df
else:
x = df.groupby(c, as_index=False)
x = x[metricas].sum()
x = adicionar_indices(x, agregado)
x = x[(coluna_agregado + metricas)]
tabela = tabela.append(x, ignore_index=True)
dtypes = dict((fk, BigInteger) for fk in coluna_agregado)
tabela.to_sql(nome, engine, index=False, dtype=dtypes)
criar_chaves(nome, agregado)
criar_dimensoes()
criar_agregados()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment