Last active
June 20, 2017 03:28
-
-
Save brenolf/f689effeab855c6a231d7073e09f9a29 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from sqlalchemy import create_engine | |
from itertools import combinations | |
from itertools import product | |
from sqlalchemy.types import * | |
# Conexao para o BD | |
engine = create_engine('postgres://breno@127.0.0.1:5432/dw') | |
df = pd.read_csv('./staging.csv') | |
df = df.rename(columns = { | |
'HUMAN_FATAL': 'vitimas_fatais', | |
'HUMAN_INJ': 'vitimas_sobreviventes', | |
'DMG_THOUS': 'custo_reparos', | |
'numero_de_tornados': 'numero_de_tornados' | |
}) | |
CONSTANT = 5000 | |
""" | |
Cria hierarquias dado um dataframe. Copia o dataframe, coloca-o abaixo do | |
carregado, deixando os dados da 1a coluna como nulos e mudando sua granularidade | |
para o nome da 2a coluna. Reajusta o indice no fim. Continua o processo ate | |
a ultima coluna. | |
""" | |
def criar_hierarquia(df): | |
df = df.copy().drop_duplicates().reset_index(drop=True) | |
d = df.copy() | |
d.loc[:, 'id'] = df.index + 1 | |
LEN = len(df.columns) - 1 | |
columns = df.columns.tolist() | |
for i in xrange(1, LEN): | |
c = columns[:i] | |
aux = df.copy() | |
aux[c] = None | |
aux = aux.drop_duplicates() | |
aux.loc[:, 'granularidade'] = columns[i] | |
aux.loc[:, 'id'] = aux.index + 1 + CONSTANT * i | |
d = pd.concat([d, aux]) | |
return d | |
""" | |
Carrega os dados de data, mudando os nomes das colunas | |
""" | |
def carregar_data(): | |
d = df[['PERIODO', 'MES', 'ESTACAO', 'ANO']].copy() | |
d = d.rename(columns={ | |
'PERIODO': 'periodo', | |
'MES': 'mes', | |
'ESTACAO': 'estacao', | |
'ANO': 'ano' | |
}) | |
d['granularidade'] = 'periodo' | |
return criar_hierarquia(d) | |
""" | |
Carrega os dados de local, mudando os nomes das colunas | |
""" | |
def carregar_local(): | |
C1 = df[['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM']].copy() | |
C2 = df[['CIDADE_DESTINO', 'PROVINCIA_DESTINO']].copy().rename(columns={ | |
'CIDADE_DESTINO': 'CIDADE_ORIGEM', | |
'PROVINCIA_DESTINO': 'PROVINCIA_ORIGEM' | |
}) | |
d = pd.concat([C1, C2]) | |
d = d.rename(columns={ | |
'CIDADE_ORIGEM': 'cidade', | |
'PROVINCIA_ORIGEM': 'provincia' | |
}) | |
d['granularidade'] = 'cidade' | |
return criar_hierarquia(d) | |
""" | |
Carrega os dados de temperatura, mudando os nomes das colunas | |
""" | |
def carregar_temperatura(): | |
d = df[['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE']].copy().drop_duplicates() | |
d = d.rename(columns={ | |
'CELCIUS_CLASSE': 'celcius', | |
'FAHRENHEIT_CLASSE': 'fahrenheit', | |
'KELVIN_CLASSE': 'kelvin' | |
}).reset_index(drop=True) | |
d['id'] = (d.index + 1) * CONSTANT | |
return d | |
""" | |
Carrega os dados de escala fujita | |
""" | |
def carregar_fujita(): | |
d = df[['FUJITA']].copy().drop_duplicates() | |
d = d.rename(columns={ | |
'FUJITA': 'id' | |
}).reset_index(drop=True) | |
return d | |
""" | |
Carrega todas as combinacoes possives de dimensoes | |
""" | |
def carregar_malha(): | |
dimensoes = ['Temperatura', 'LocalOrigem', 'LocalDestino', 'Fujita', 'Data'] | |
X = [] | |
for i in xrange(0, 6): | |
C = combinations(dimensoes, i) | |
X.append(list(C)) | |
return X | |
""" | |
Chama todas as funcoes de dimensoes e as registra no banco | |
""" | |
def criar_dimensoes(): | |
dims = ['data', 'local', 'temperatura'] | |
sql = """ | |
ALTER TABLE | |
{} | |
ADD PRIMARY KEY | |
(id) | |
""" | |
for d in dims: | |
data = globals()['carregar_' + d]() | |
data.to_sql(d, engine, index=False, dtype={'id': BigInteger}) | |
engine.execute(sql.format(d, d, d)) | |
""" | |
Retorna um DataFrame com colunas de ids corretas para as dimensoes a que se | |
referem com base no atributo `dimensoes` | |
""" | |
def adicionar_indices(original, dimensoes): | |
df = original.copy() | |
campos = { | |
'Temperatura': ['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE'], | |
'LocalOrigem': ['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM'], | |
'LocalDestino': ['CIDADE_DESTINO', 'PROVINCIA_DESTINO'], | |
'Fujita': ['FUJITA'], | |
'Data': ['PERIODO', 'MES', 'ESTACAO', 'ANO'] | |
} | |
joins = { | |
'Temperatura': ['celcius', 'fahrenheit', 'kelvin'], | |
'LocalOrigem': ['cidade', 'provincia'], | |
'LocalDestino': ['cidade', 'provincia'], | |
'Fujita': ['id'], | |
'Data': ['periodo', 'mes', 'estacao', 'ano'] | |
} | |
fks = { | |
'Temperatura': 'temperatura_media', | |
'LocalOrigem': 'local_origem', | |
'LocalDestino': 'local_destino', | |
'Fujita': 'escala_fujita', | |
'Data': 'data_id' | |
} | |
tables = { | |
'Temperatura': carregar_temperatura(), | |
'LocalOrigem': carregar_local(), | |
'LocalDestino': carregar_local(), | |
'Fujita': carregar_fujita(), | |
'Data': carregar_data() | |
} | |
for d in dimensoes: | |
for c in campos[d]: | |
if not (c in df.columns): | |
df[c] = None | |
df = df.merge(tables[d], left_on=campos[d], right_on=joins[d]) | |
df = df.rename(columns = { | |
'id': fks[d] | |
}) | |
return df | |
""" | |
Cria chaves estrangeiras no banco | |
""" | |
def criar_chaves(tabela, dimensoes): | |
if len(dimensoes) == 0: | |
return | |
sql_FK = """ | |
ALTER TABLE | |
{} | |
ADD CONSTRAINT | |
{} | |
FOREIGN KEY | |
({}) | |
REFERENCES | |
{} (id) | |
""" | |
sql_PK = """ | |
ALTER TABLE | |
{} | |
ADD PRIMARY KEY | |
({}) | |
""" | |
dims = { | |
'Temperatura': 'temperatura', | |
'LocalOrigem': 'local', | |
'LocalDestino': 'local', | |
'Data': 'data' | |
} | |
fks = { | |
'Temperatura': 'temperatura_media', | |
'LocalOrigem': 'local_origem', | |
'LocalDestino': 'local_destino', | |
'Data': 'data_id', | |
'Fujita': 'escala_fujita' | |
} | |
pk = ', '.join(map(lambda x: fks[x], dimensoes)) | |
engine.execute(sql_PK.format(tabela, pk)) | |
dimensoes = set(dimensoes) - set(['Fujita']) | |
for d in dimensoes: | |
constraint = 'FK_' + tabela + '__' + d.lower() | |
engine.execute(sql_FK.format(tabela, constraint, fks[d], dims[d])) | |
""" | |
Cria agregados | |
""" | |
def criar_agregados(): | |
malha = carregar_malha() | |
fks = { | |
'Temperatura': 'temperatura_media', | |
'LocalOrigem': 'local_origem', | |
'LocalDestino': 'local_destino', | |
'Fujita': 'escala_fujita', | |
'Data': 'data_id' | |
} | |
hierarquias = { | |
'Temperatura': [['CELCIUS_CLASSE', 'FAHRENHEIT_CLASSE', 'KELVIN_CLASSE']], | |
'LocalOrigem': [['CIDADE_ORIGEM', 'PROVINCIA_ORIGEM'], ['PROVINCIA_ORIGEM']], | |
'LocalDestino': [['CIDADE_DESTINO', 'PROVINCIA_DESTINO'], ['PROVINCIA_DESTINO']], | |
'Fujita': [['FUJITA']], | |
'Data': [ | |
['PERIODO', 'MES', 'ESTACAO', 'ANO'], | |
['MES', 'ESTACAO', 'ANO'], | |
['ESTACAO', 'ANO'], | |
['ANO'] | |
] | |
} | |
metricas = ['vitimas_fatais', 'vitimas_sobreviventes', 'custo_reparos', 'numero_de_tornados'] | |
for m in malha: | |
for agregado in m: | |
nome = '_'.join(agregado) | |
nome = 'Fato' if nome == 'Temperatura_LocalOrigem_LocalDestino_Fujita_Data' else nome | |
nome = 'Tudo' if nome == '' else nome | |
nome = ('a_' + nome).lower() | |
print 'agregado', nome, '\n\n' | |
campos = [hierarquias[dimensao] for dimensao in agregado] | |
campos = list(product(*campos)) | |
campos = [reduce(lambda x, y: x + y, c, []) for c in campos] | |
coluna_agregado = map(lambda x: fks[x], agregado) | |
coluna_agregado = list(set(coluna_agregado)) | |
tabela = pd.DataFrame(columns=coluna_agregado + metricas) | |
for c in campos: | |
if len(c) == 0: | |
x = df | |
else: | |
x = df.groupby(c, as_index=False) | |
x = x[metricas].sum() | |
x = adicionar_indices(x, agregado) | |
x = x[(coluna_agregado + metricas)] | |
tabela = tabela.append(x, ignore_index=True) | |
dtypes = dict((fk, BigInteger) for fk in coluna_agregado) | |
tabela.to_sql(nome, engine, index=False, dtype=dtypes) | |
criar_chaves(nome, agregado) | |
criar_dimensoes() | |
criar_agregados() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment