Last active
March 7, 2021 15:49
-
-
Save rburhum/ded6a78a0ef2824b8477167aa162a0ce to your computer and use it in GitHub Desktop.
Procesar Casos positivos de datos abiertos (Perú) MINSA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
start_time = datetime.now() | |
import os | |
# Open data portal constants | |
OPENDATA_URL_CASOS_POSITIVOS = 'https://www.datosabiertos.gob.pe/dataset/casos-positivos-por-covid-19-ministerio-de-salud-minsa' | |
ALTERNATE_URL_CASOS_POSITIVOS = 'https://cloud.minsa.gob.pe/s/Y8w3wHsEdYQSZRp/download' # alternate URL from MINSA cloud service | |
DESTINATION_CASOS_POSITIVOS_RAW = './data/casos-positivos-raw.csv' | |
CASOS_POSITIVOS_CLEAN = './data/casos-positivos-utf8.csv' | |
# Database constants | |
DESTINATION_CASOS_POSITIVOS = 'casos_positivos' # sanitized table of casos positivos | |
STATIC_CENSUS_TABLE_NAME = 'censo_peru_distrito' # table from the peruvian census. | |
LOCAL_DB_NAME = 'covid2' | |
# Make sure INEI data exists in db, otherwise load it | |
import psycopg2 | |
conn = psycopg2.connect("dbname=" + LOCAL_DB_NAME) | |
def table_exists(con, table_str): | |
exists = False | |
try: | |
cur = con.cursor() | |
cur.execute("select exists(select relname from pg_class where relname='" + table_str + "')") | |
exists = cur.fetchone()[0] | |
cur.close() | |
except psycopg2.Error as e: | |
print(e) | |
return exists | |
if not table_exists(conn, STATIC_CENSUS_TABLE_NAME): | |
print("Loading " + STATIC_CENSUS_TABLE_NAME + "...") | |
!ogr2ogr -nln $STATIC_CENSUS_TABLE_NAME -f PostgreSQL PG:"dbname=$LOCAL_DB_NAME" ./data/inei/LIMDISTRITO_IND_POBREZA_CPV17.gdb | |
print("...done") | |
else: | |
print(STATIC_CENSUS_TABLE_NAME + " found in local db") | |
conn = None | |
# Pick function to download from alternate portal or minsa cloud link | |
import requests | |
def download_from_open_portal(): | |
from bs4 import BeautifulSoup | |
page = requests.get(OPENDATA_URL_CASOS_POSITIVOS) | |
soup = BeautifulSoup(page.content, 'html.parser') | |
# download data into csv file | |
download_section = soup.find_all(class_="btn btn-primary data-link") | |
assert len(download_section) == 1 # there can be only one download link unless they changed the page | |
r = requests.get(download_section[0].get('href'), allow_redirects=True) | |
open(DESTINATION_CASOS_POSITIVOS_RAW, 'wb').write(r.content) | |
def download_from_link(): | |
r = requests.get(ALTERNATE_URL_CASOS_POSITIVOS) | |
open(DESTINATION_CASOS_POSITIVOS_RAW, 'wb').write(r.content) | |
download_from_link() | |
#download_from_open_portal() | |
print("File Downloaded to " + DESTINATION_CASOS_POSITIVOS_RAW) | |
# the data at the MINSA is coming with Windows characters, transform to UTF8 | |
#!iconv -f iso-8859-1 -t utf-8 < $DESTINATION_CASOS_POSITIVOS_RAW > $CASOS_POSITIVOS_CLEAN | |
!mv $DESTINATION_CASOS_POSITIVOS_RAW $CASOS_POSITIVOS_CLEAN | |
# Load data into postgis | |
!ogr2ogr -progress -append --config OGR_TRUNCATE YES -nln casos_positivos_in -f PostgreSQL PG:"dbname=$LOCAL_DB_NAME" ./data/casos-positivos-utf8.csv | |
# print out some statistics | |
import pandas as pd | |
import psycopg2 | |
from IPython.display import display, HTML | |
conn = None | |
def get_duplicate_record_count(conn): | |
cur = conn.cursor() | |
sql = "select count(*) from (SELECT (casos_positivos.*)::text, count(*) from casos_positivos group by casos_positivos.* HAVING count(*) > 1) as x" | |
cur.execute(sql) | |
result = cur.fetchall() | |
return result[0][0] | |
def remove_duplicate_records(conn): | |
cur = conn.cursor() | |
sql = "ALTER table " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN id SERIAL" | |
cur.execute(sql) | |
# quick trick to remove duplicate rows and keep only one | |
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " a USING " + \ | |
DESTINATION_CASOS_POSITIVOS + " b WHERE a.id < b.id AND a.uuid = b.uuid" | |
cur.execute(sql) | |
rows_removed = cur.rowcount | |
sql = "ALTER table " + DESTINATION_CASOS_POSITIVOS + " DROP COLUMN id" | |
cur.execute(sql) | |
return rows_removed | |
def print_type_of_test_summary(conn, title, table_name): | |
print(title) | |
result = pd.read_sql("SELECT metododx as tipo_test, to_char(count(*), 'FM9,999,999') FROM " + \ | |
table_name + " casos_positivos_in group by metododx", conn) | |
display(HTML(result.to_html())) | |
def connect_and_clean_tables(): | |
conn = psycopg2.connect("dbname=" + LOCAL_DB_NAME) | |
conn.autocommit = True | |
# drop target table if exists | |
cur = conn.cursor() | |
cur.execute('DROP TABLE IF EXISTS ' + DESTINATION_CASOS_POSITIVOS) | |
#recreate it | |
#sql = " CREATE TABLE " + DESTINATION_CASOS_POSITIVOS + \ | |
# " AS ( SELECT uuid, departamento, provincia, distrito, metododx, edad, sexo," + \ | |
# " TO_DATE(fecha_resultado, 'DD/MM/YYYY') as fecha_resultado " +\ | |
# " FROM casos_positivos_in )" | |
sql = " CREATE TABLE " + DESTINATION_CASOS_POSITIVOS + \ | |
" AS ( SELECT uuid, departamento, provincia, distrito, metododx, edad, sexo," + \ | |
" TO_DATE(fecha_resultado, 'YYYYMMDD') as fecha_resultado " +\ | |
" FROM casos_positivos_in )" | |
cur.execute(sql) | |
print_type_of_test_summary(conn, 'Original Table Summary', 'casos_positivos_in') | |
# report duplicate data | |
duplicate_record_count = get_duplicate_record_count(conn) | |
print('Found ' + str(duplicate_record_count) + ' duplicate records') | |
if duplicate_record_count > 0: | |
records_removed = remove_duplicate_records(conn) | |
print ('Removed ' + str(records_removed) + ' unnecessary duplicate records') | |
assert get_duplicate_record_count(conn) == 0 | |
print_type_of_test_summary(conn, 'Summary without duplicates - should equal offical #s', DESTINATION_CASOS_POSITIVOS) | |
# delete records with invalid dates | |
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " WHERE fecha_resultado = '0001-01-01 BC'" | |
cur.execute(sql) | |
print("Deleted " + str(cur.rowcount) + " with invalid fecha_resultado") | |
print_type_of_test_summary(conn, 'After cleaning table', DESTINATION_CASOS_POSITIVOS) | |
# add primary key | |
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN ogc_fid SERIAL PRIMARY KEY") | |
# add place holder for geometries | |
cur.execute("SELECT AddGeometryColumn ('public','" + DESTINATION_CASOS_POSITIVOS + "','wkb_geometry',4326,'POINT',2);") | |
cur.execute("SELECT AddGeometryColumn ('public','" + DESTINATION_CASOS_POSITIVOS + "','shape_district',4326,'MULTIPOLYGON',2);") | |
# add fields for census info | |
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN ubigeo varchar(6)") | |
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN pobreza double precision") | |
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN poblacion double precision") | |
return conn | |
conn = connect_and_clean_tables() | |
def remove_invalid_distrito(conn): | |
# delete records with where distrito is invalid | |
cur = conn.cursor() | |
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " WHERE distrito = 'EN INVESTIGACIÓN'" | |
cur.execute(sql) | |
print("Deleted " + str(cur.rowcount) + " with invalid distrito") | |
remove_invalid_distrito(conn) | |
def print_stats(conn): | |
df = pd.read_sql("select fecha_resultado, count(fecha_resultado) as count from casos_positivos where metododx = 'PCR' group by fecha_resultado", conn) | |
df.plot.bar(x='fecha_resultado', y='count', figsize=(40,10), title='Casos positivos por PCR') | |
df = pd.read_sql("select fecha_resultado, count(fecha_resultado) as count from casos_positivos group by fecha_resultado", conn) | |
df.plot.bar(x='fecha_resultado', y='count', figsize=(40,10), title='Casos positivos por PCR y PR') | |
print_stats(conn) | |
# georeference casos | |
def populate_geometry_and_census_info(conn): | |
# cleanup data (tons of mispelling) | |
cur = conn.cursor() | |
print("Cleaning data...") | |
# Making names in INS match INEI data | |
# no entiendo como el nombre de este distrito en el INEI tiene un underscore ¯\_(ツ)_/¯ | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'ANCO_HUALLO' WHERE distrito = 'ANCO HUALLO'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'ANCO HUALLO'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'SAN FRANCISCO DE ASIS DE YARUSYACAN' WHERE distrito = 'SAN FCO DE ASIS DE YARUSYACAN'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'SAN FCO DE ASIS DE YARUSYACAN'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'HUAY-HUAY' WHERE distrito = 'HUAY HUAY'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'HUAY HUAY'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET provincia = 'ANTONIO RAYMONDI' WHERE provincia = 'ANTONIO RAIMONDI' AND departamento = 'ANCASH'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'ANTONIO RAIMONDI'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'ANDRES AVELINO CACERES DORREGARAY' WHERE distrito = 'ANDRES AVELINO CACERES D.'"+\ | |
" AND departamento = 'AYACUCHO'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'ANDRES AVELINO CACERES D.'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'CARMEN DE LA LEGUA REYNOSO' WHERE distrito = 'CARMEN DE LA LEGUA-REYNOSO'"+\ | |
" AND departamento = 'CALLAO'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'CARMEN DE LA LEGUA-REYNOSO'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'NASCA' WHERE distrito = 'NAZCA'"+\ | |
" AND departamento = 'ICA'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'NAZCA (distrito)'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET provincia = 'NASCA' WHERE provincia = 'NAZCA'"+\ | |
" AND departamento = 'ICA'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'NAZCA (provincia)'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'SAN PEDRO DE PUTINA PUNCO' WHERE distrito = 'SAN PEDRO DE PUTINA PUNCU'"+\ | |
" AND departamento = 'PUNO'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'SAN PEDRO DE PUTINA PUNCU'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'CORONEL GREGORIO ALBARRACIN LANCHIPA' WHERE distrito = 'CORONEL GREGORIO ALBARRACIN L.'"+\ | |
" AND departamento = 'TACNA'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'COREONEL GREGORIO ALBARRICIN'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'ESTIQUE-PAMPA' WHERE distrito = 'ESTIQUE PAMPA'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'ESTIQUE PAMPA'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET departamento = 'LIMA' WHERE departamento = 'LIMA REGION'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'LIMA REGION'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'SAN FRANCISCO DEL YESO' WHERE distrito = 'SAN FRANCISCO DE YESO'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'SAN FRANCISCO DE YESO'") | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\ | |
" SET distrito = 'QUITO-ARMA' WHERE distrito = 'QUITO ARMA'" | |
cur.execute(sql) | |
print("Cleaned " + str(cur.rowcount) + " with 'QUITO ARMA'") | |
print("Georeferencing data...") | |
# use census to georeference casos and load census data | |
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS + " casos " +\ | |
"SET wkb_geometry = ST_PointOnSurface(censo.shape), "+\ | |
"shape_district = censo.shape, " +\ | |
"ubigeo = censo.ubigeo, " +\ | |
"pobreza = censo.pobreza, " +\ | |
"poblacion = censo.p_001 " +\ | |
" FROM " + STATIC_CENSUS_TABLE_NAME + " censo WHERE casos.departamento = censo.nombdep AND "+\ | |
"casos.provincia = censo.nombprov AND casos.distrito = censo.nombdist" | |
cur.execute(sql) | |
print("Verifying georeferenced data") | |
# find if any records where not referenced | |
sql = "SELECT count(*) FROM "+ DESTINATION_CASOS_POSITIVOS + " WHERE wkb_geometry is null" | |
cur.execute(sql) | |
records_not_referenced = int(cur.fetchall()[0][0]) | |
if records_not_referenced > 0: | |
print("Records not georeferenced: " + str(records_not_referenced)) | |
exit(1) | |
else: | |
print("All records georeferenced succesfully") | |
populate_geometry_and_census_info(conn) | |
end_time = datetime.now() | |
print('Duration so far: {}'.format(end_time - start_time)) | |
postprocess_file = open('process.sql', 'r') | |
cur = conn.cursor() | |
cur.execute(postprocess_file.read()) | |
postprocess_file = None | |
cur = None | |
end_time = datetime.now() | |
print('Duration total after queries: {}'.format(end_time - start_time)) | |
def vacuum(): | |
old_isolation_level = conn.isolation_level | |
conn.set_isolation_level(0) | |
query = "VACUUM ANALYZE" | |
cur = conn.cursor() | |
cur.execute(query) | |
conn.set_isolation_level(old_isolation_level) | |
vacuum() | |
end_time = datetime.now() | |
print('Duration total after queries: {}'.format(end_time - start_time)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#creo un folder llamado ./data/inei y pongo esto allí | |
https://www.dropbox.com/t/NdMoLpvp0kUBl1ZY |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE result_table; | |
CREATE TABLE result_table as ( | |
SELECT weekly, shape_district as wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana, | |
ubigeo, pobreza, poblacion, ((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion | |
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, shape_district, | |
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion | |
FROM casos_positivos | |
GROUP BY fecha_resultado, shape_district, ubigeo, pobreza, poblacion | |
ORDER BY fecha_resultado) as x | |
GROUP BY weekly, shape_district, ubigeo, pobreza, poblacion | |
ORDER BY weekly); | |
ALTER TABLE result_table ADD COLUMN ogc_fid SERIAL PRIMARY KEY; | |
ALTER TABLE result_table ADD COLUMN departamento VARCHAR; | |
ALTER TABLE result_table ADD COLUMN provincia VARCHAR; | |
ALTER TABLE result_table ADD COLUMN distrito VARCHAR; | |
UPDATE result_table as t set departamento = c.nombdep, provincia = c.nombprov, distrito = nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo; | |
CREATE INDEX result_table_idx on result_table USING GIST(wkb_geometry); | |
DROP TABLE result_table_point; | |
CREATE TABLE result_table_point as ( | |
SELECT weekly, wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana, | |
ubigeo, pobreza, poblacion, ((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion | |
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, wkb_geometry, | |
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion | |
FROM casos_positivos | |
GROUP BY fecha_resultado, wkb_geometry, ubigeo, pobreza, poblacion | |
ORDER BY fecha_resultado) as x | |
GROUP BY weekly, wkb_geometry, ubigeo, pobreza, poblacion | |
ORDER BY weekly); | |
ALTER TABLE result_table_point ADD COLUMN ogc_fid SERIAL PRIMARY KEY; | |
ALTER TABLE result_table_point ADD COLUMN departamento VARCHAR; | |
ALTER TABLE result_table_point ADD COLUMN provincia VARCHAR; | |
ALTER TABLE result_table_point ADD COLUMN distrito VARCHAR; | |
UPDATE result_table_point as t set departamento = c.nombdep, provincia = c.nombprov, distrito = nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo; | |
CREATE INDEX result_table_point_idx on result_table_point USING GIST(wkb_geometry); | |
DROP TABLE result_table_pcr; | |
CREATE TABLE result_table_pcr as ( | |
SELECT weekly, wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana, | |
ubigeo, pobreza, poblacion, ROUND((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion | |
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, wkb_geometry, | |
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion | |
FROM casos_positivos | |
WHERE metododx = 'PCR' | |
GROUP BY fecha_resultado, wkb_geometry, ubigeo, pobreza, poblacion | |
ORDER BY fecha_resultado) as x | |
GROUP BY weekly, wkb_geometry, ubigeo, pobreza, poblacion | |
ORDER BY weekly); | |
ALTER TABLE result_table_pcr ADD COLUMN ogc_fid SERIAL PRIMARY KEY; | |
ALTER TABLE result_table_pcr ADD COLUMN departamento VARCHAR; | |
ALTER TABLE result_table_pcr ADD COLUMN provincia VARCHAR; | |
ALTER TABLE result_table_pcr ADD COLUMN distrito VARCHAR; | |
UPDATE result_table_pcr as t set departamento = c.nombdep, provincia = c.nombprov, distrito = c.nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo; | |
ALTER TABLE result_table ADD COLUMN casos_acumulados NUMERIC; | |
UPDATE result_table as r set casos_acumulados = s.casos_acumulados FROM (select ubigeo, sum(casos_por_semana) as casos_acumulados from result_table group by ubigeo) as s where r.ubigeo = s.ubigeo; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
jupyterlab | |
beautifulsoup4 | |
pandas | |
psycopg2 | |
matplotlib | |
#jupyterlab-lsp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def column_exists(conn, schema_name, table_name, column_name): | |
cur = conn.cursor() | |
sql = "SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='" + \ | |
schema_name + "' AND table_name = '" + table_name + "' AND column_name = '" + \ | |
column_name + "'" | |
+ AND table_name='casos_positivos_in' AND column_name='fecha');' | |
cur.execute('ALTER TABLE casos_positivos_in ADD COLUMN fecha date') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment