Skip to content

Instantly share code, notes, and snippets.

@gustavorps
Created November 16, 2019 21:25
Show Gist options
  • Save gustavorps/726fb1d4eeec3e11dda2cb284b5c0c40 to your computer and use it in GitHub Desktop.
Save gustavorps/726fb1d4eeec3e11dda2cb284b5c0c40 to your computer and use it in GitHub Desktop.
import re
import os
FILE_DIR = os.path.abspath(__file__).split(os.sep)
import re
from sqlalchemy import types
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
import pandas as pd
import yaml
def parse_type_sqlalchemy(value):
regex = r"^(?P<type>\w+)\((?P<arg>\d{1,3})\)"
test_str = ["char(1)"]
matches = re.match(regex, value)
type_sql = matches.group('type').lower()
if type_sql in {'char', 'character'}:
n = int(matches.group('arg'))
return types.NCHAR(n)
if type_sql in {'varchar', 'varchar2'}:
n = int(matches.group('arg'))
return types.VARCHAR(n)
if type_sql in {'date'}:
n = int(matches.group('arg'))
return types.DATE()
raise NotImplementedError
datapackage = None
with open(os.path.join('./dags/sus_cnes/', "datapackage.yml"), 'r') as stream:
try:
datapackage = yaml.safe_load(stream)
except yaml.YAMLError as exc:
raise ImportError
fields = datapackage['resources'][0]['schema']['fields']
dtype_pd = {f['name']: f.get('_type_pandas', f['type'])
for f in fields if not 'date' in f.get('_type_pandas', '') }
dtype_sql = {f['name']: parse_type_sqlalchemy(f['_type_sql'])
for f in fields if '_type_sql' in f}
cols_dates = (f['name']
for f in fields if ('_type_pandas' in f \
and 'date' in f.get('_type_pandas')))
# print(dtype_pd)
# print(dtype_sql)
# print(list(cols_dates))
# print(converters)
rep = {
"TO_CHAR(": "",
",'DD/MM/YYYY')": "",
}
rep = dict((re.escape(k), v) for k, v in rep.items())
pattern = re.compile("|".join(rep.keys()))
def date_parser(value):
return pd.to_datetime(value, format='%d/%m/%Y', errors='coerce')
df = pd.read_csv(
'/home/gustavorps/workspace/datasets/datasus-cnes/BASE_DE_DADOS_CNES_201909/tbEstabelecimento201909.csv',
sep=';',
dtype=dtype_pd,
parse_dates=list(cols_dates),
date_parser=date_parser,
nrows=5000)
# print(df.info(memory_usage='deep'))
df.columns = [pattern.sub(lambda m: rep[re.escape(m.group(0))], c)
for c in df.columns]
# print(df.info(memory_usage='deep'))
# print(df.memory_usage(deep=True))
engine = create_engine('postgresql://odoo:odoo@localhost:5432/cnes_raw')
if not database_exists(engine.url):
create_database(engine.url)
df.to_sql('tbEstabelecimento',
engine,
dtype=dtype_sql,
if_exists='replace',
chunksize=1000,)
name: CNES
resources:
- name: TB_ESTABELECIMENTO
description: >
LFCES004
Estabelecimentos de Saúde
path: "./"
profile: tabular-data-resource
schema:
primaryKey: CO_UNIDADE
fields:
- name: CO_ATIVIDADE
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: CHAR(2)
- name: CO_ATIVIDADE_PRINCIPAL
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(3)
- name: CO_CEP
type: integer
_type_pandas: string_
# _type_pandas: Int64
_type_sql: VARCHAR2(8)
- name: CO_CLIENTELA
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: CHAR(2)
- name: CO_CNES
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(7)
- name: CO_CPFDIRETORCLN
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(11)
- name: CO_DISTRITO_ADMINISTRATIVO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(4)
- name: CO_DISTRITO_SANITARIO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(4)
- name: CO_ESTADO_GESTOR
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: CHAR(2)
- name: CO_MICRO_REGIAO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(6)
- name: CO_MOTIVO_DESAB
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(2)
- name: CO_MUNICIPIO_GESTOR
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(7)
- name: CO_NATUREZA_JUR
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(04)
- name: CO_REGIAO_SAUDE
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(4)
- name: CO_TIPO_ESTABELECIMENTO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(04)
- name: CO_TIPO_UNIDADE
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: CHAR(2)
- name: CO_TURNO_ATENDIMENTO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: CHAR(2)
- name: CO_UNIDADE
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(31)
- name: CO_USUARIO
type: string
_type_pandas: category
# _type_pandas: Int64
_type_sql: VARCHAR2(12)
- name: TO_CHAR(DT_ATU_GEO,'DD/MM/YYYY')
_rename: DT_ATU_GEO
type: date
format: "%d/%m/%Y"
_type_pandas: datetime64[D]
# TODO: remote parenteses
_type_sql: date(5)
- name: TO_CHAR(DT_ATUALIZACAO,'DD/MM/YYYY')
_rename: DT_ATUALIZACAO
type: date
format: "%d/%m/%Y"
_type_pandas: datetime64[D]
# TODO: remote parenteses
_type_sql: date(5)
- name: TO_CHAR(DT_ATUALIZACAO_ORIGEM,'DD/MM/YYYY')
_rename: DT_ATUALIZACAO_ORIGEM
type: date
format: "%d/%m/%Y"
_type_pandas: datetime64[D]
# TODO: remote parenteses
_type_sql: date(5)
- name: TO_CHAR(DT_EXPEDICAO,'DD/MM/YYYY')
_rename: DT_ATUALIZACAO_ORIGEM
type: date
format: "%d/%m/%Y"
_type_pandas: datetime64[D]
# TODO: remote parenteses
_type_sql: date(5)
- name: TO_CHAR(DT_VAL_LIC_SANI,'DD/MM/YYYY')
_rename: DT_VAL_LIC_SANI
type: date
format: "%d/%m/%Y"
_type_pandas: datetime64[D]
# TODO: remote parenteses
_type_sql: date(5)
- name: NIVEL_DEP
type: Int64
# _type_sql:
- name: NO_BAIRRO
type: category
# _type_sql:
- name: NO_COMPLEMENTO
type: string_
# _type_sql:
- name: NO_EMAIL
type: category
# _type_sql:
- name: NO_FANTASIA
type: category
# _type_sql:
- name: NO_FANTASIA_ABREV
type: string_
# _type_sql:
- name: NO_LOGRADOURO
type: string_
# _type_sql:
- name: NO_RAZAO_SOCIAL
type: string_
# _type_sql:
- name: NO_URL
type: string_
# _type_sql:
- name: NO_USUARIO_GEO
type: category
# _type_sql:
- name: NU_ALVARA
type: string_
# _type_sql:
- name: NU_CNPJ
type: category
# _type_sql:
- name: NU_CNPJ_MANTENEDORA
type: string
_type_pandas: U
_type_sql: char(15)
- name: NU_CPF
type: category
# _type_sql:
- name: NU_ENDERECO
type: string_
# _type_sql:
- name: NU_FAX
type: string_
# _type_sql:
- name: NU_LATITUDE
type: string_
# _type_sql:
- name: NU_LONGITUDE
type: string_
# _type_sql:
- name: NU_TELEFONE
type: string_
# _type_sql:
- name: REG_DIRETORCLN
type: string_
# _type_sql:
- name: ST_ADESAO_FILANTROP
type: string_
# _type_sql:
- name: ST_CONEXAO_INTERNET
type: string_
# _type_sql:
- name: ST_CONTRATO_FORMALIZADO
type: string_
# _type_sql:
- name: ST_GERACREDITO_GERENTE_SGIF
type: string_
# _type_sql:
- name: TP_ESTAB_SEMPRE_ABERTO
type: category
# _type_sql:
- name: TP_GESTAO
type: category
# _type_sql:
- name: TP_LIC_SANI
type: category
# _type_sql:
- name: TP_ORGAO_EXPEDIDOR
type: category
# _type_sql:
- name: TP_PFPJ
type: category
# _type_sql:
- name: TP_UNIDADE
type: category
# _type_sql:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment