Skip to content

Instantly share code, notes, and snippets.

@rburhum
Last active March 7, 2021 15:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rburhum/ded6a78a0ef2824b8477167aa162a0ce to your computer and use it in GitHub Desktop.
Save rburhum/ded6a78a0ef2824b8477167aa162a0ce to your computer and use it in GitHub Desktop.
Procesar Casos positivos de datos abiertos (Perú) MINSA
from datetime import datetime
start_time = datetime.now()
import os
# Open data portal constants
OPENDATA_URL_CASOS_POSITIVOS = 'https://www.datosabiertos.gob.pe/dataset/casos-positivos-por-covid-19-ministerio-de-salud-minsa'
ALTERNATE_URL_CASOS_POSITIVOS = 'https://cloud.minsa.gob.pe/s/Y8w3wHsEdYQSZRp/download' # alternate URL from MINSA cloud service
DESTINATION_CASOS_POSITIVOS_RAW = './data/casos-positivos-raw.csv'
CASOS_POSITIVOS_CLEAN = './data/casos-positivos-utf8.csv'
# Database constants
DESTINATION_CASOS_POSITIVOS = 'casos_positivos' # sanitized table of casos positivos
STATIC_CENSUS_TABLE_NAME = 'censo_peru_distrito' # table from the peruvian census.
LOCAL_DB_NAME = 'covid2'
# Make sure INEI data exists in db, otherwise load it
import psycopg2
conn = psycopg2.connect("dbname=" + LOCAL_DB_NAME)
def table_exists(con, table_str):
exists = False
try:
cur = con.cursor()
cur.execute("select exists(select relname from pg_class where relname='" + table_str + "')")
exists = cur.fetchone()[0]
cur.close()
except psycopg2.Error as e:
print(e)
return exists
if not table_exists(conn, STATIC_CENSUS_TABLE_NAME):
print("Loading " + STATIC_CENSUS_TABLE_NAME + "...")
!ogr2ogr -nln $STATIC_CENSUS_TABLE_NAME -f PostgreSQL PG:"dbname=$LOCAL_DB_NAME" ./data/inei/LIMDISTRITO_IND_POBREZA_CPV17.gdb
print("...done")
else:
print(STATIC_CENSUS_TABLE_NAME + " found in local db")
conn = None
# Pick function to download from alternate portal or minsa cloud link
import requests
def download_from_open_portal():
from bs4 import BeautifulSoup
page = requests.get(OPENDATA_URL_CASOS_POSITIVOS)
soup = BeautifulSoup(page.content, 'html.parser')
# download data into csv file
download_section = soup.find_all(class_="btn btn-primary data-link")
assert len(download_section) == 1 # there can be only one download link unless they changed the page
r = requests.get(download_section[0].get('href'), allow_redirects=True)
open(DESTINATION_CASOS_POSITIVOS_RAW, 'wb').write(r.content)
def download_from_link():
r = requests.get(ALTERNATE_URL_CASOS_POSITIVOS)
open(DESTINATION_CASOS_POSITIVOS_RAW, 'wb').write(r.content)
download_from_link()
#download_from_open_portal()
print("File Downloaded to " + DESTINATION_CASOS_POSITIVOS_RAW)
# the data at the MINSA is coming with Windows characters, transform to UTF8
#!iconv -f iso-8859-1 -t utf-8 < $DESTINATION_CASOS_POSITIVOS_RAW > $CASOS_POSITIVOS_CLEAN
!mv $DESTINATION_CASOS_POSITIVOS_RAW $CASOS_POSITIVOS_CLEAN
# Load data into postgis
!ogr2ogr -progress -append --config OGR_TRUNCATE YES -nln casos_positivos_in -f PostgreSQL PG:"dbname=$LOCAL_DB_NAME" ./data/casos-positivos-utf8.csv
# print out some statistics
import pandas as pd
import psycopg2
from IPython.display import display, HTML
conn = None
def get_duplicate_record_count(conn):
cur = conn.cursor()
sql = "select count(*) from (SELECT (casos_positivos.*)::text, count(*) from casos_positivos group by casos_positivos.* HAVING count(*) > 1) as x"
cur.execute(sql)
result = cur.fetchall()
return result[0][0]
def remove_duplicate_records(conn):
cur = conn.cursor()
sql = "ALTER table " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN id SERIAL"
cur.execute(sql)
# quick trick to remove duplicate rows and keep only one
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " a USING " + \
DESTINATION_CASOS_POSITIVOS + " b WHERE a.id < b.id AND a.uuid = b.uuid"
cur.execute(sql)
rows_removed = cur.rowcount
sql = "ALTER table " + DESTINATION_CASOS_POSITIVOS + " DROP COLUMN id"
cur.execute(sql)
return rows_removed
def print_type_of_test_summary(conn, title, table_name):
print(title)
result = pd.read_sql("SELECT metododx as tipo_test, to_char(count(*), 'FM9,999,999') FROM " + \
table_name + " casos_positivos_in group by metododx", conn)
display(HTML(result.to_html()))
def connect_and_clean_tables():
conn = psycopg2.connect("dbname=" + LOCAL_DB_NAME)
conn.autocommit = True
# drop target table if exists
cur = conn.cursor()
cur.execute('DROP TABLE IF EXISTS ' + DESTINATION_CASOS_POSITIVOS)
#recreate it
#sql = " CREATE TABLE " + DESTINATION_CASOS_POSITIVOS + \
# " AS ( SELECT uuid, departamento, provincia, distrito, metododx, edad, sexo," + \
# " TO_DATE(fecha_resultado, 'DD/MM/YYYY') as fecha_resultado " +\
# " FROM casos_positivos_in )"
sql = " CREATE TABLE " + DESTINATION_CASOS_POSITIVOS + \
" AS ( SELECT uuid, departamento, provincia, distrito, metododx, edad, sexo," + \
" TO_DATE(fecha_resultado, 'YYYYMMDD') as fecha_resultado " +\
" FROM casos_positivos_in )"
cur.execute(sql)
print_type_of_test_summary(conn, 'Original Table Summary', 'casos_positivos_in')
# report duplicate data
duplicate_record_count = get_duplicate_record_count(conn)
print('Found ' + str(duplicate_record_count) + ' duplicate records')
if duplicate_record_count > 0:
records_removed = remove_duplicate_records(conn)
print ('Removed ' + str(records_removed) + ' unnecessary duplicate records')
assert get_duplicate_record_count(conn) == 0
print_type_of_test_summary(conn, 'Summary without duplicates - should equal offical #s', DESTINATION_CASOS_POSITIVOS)
# delete records with invalid dates
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " WHERE fecha_resultado = '0001-01-01 BC'"
cur.execute(sql)
print("Deleted " + str(cur.rowcount) + " with invalid fecha_resultado")
print_type_of_test_summary(conn, 'After cleaning table', DESTINATION_CASOS_POSITIVOS)
# add primary key
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN ogc_fid SERIAL PRIMARY KEY")
# add place holder for geometries
cur.execute("SELECT AddGeometryColumn ('public','" + DESTINATION_CASOS_POSITIVOS + "','wkb_geometry',4326,'POINT',2);")
cur.execute("SELECT AddGeometryColumn ('public','" + DESTINATION_CASOS_POSITIVOS + "','shape_district',4326,'MULTIPOLYGON',2);")
# add fields for census info
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN ubigeo varchar(6)")
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN pobreza double precision")
cur.execute("ALTER TABLE " + DESTINATION_CASOS_POSITIVOS + " ADD COLUMN poblacion double precision")
return conn
conn = connect_and_clean_tables()
def remove_invalid_distrito(conn):
# delete records with where distrito is invalid
cur = conn.cursor()
sql = "DELETE FROM " + DESTINATION_CASOS_POSITIVOS + " WHERE distrito = 'EN INVESTIGACIÓN'"
cur.execute(sql)
print("Deleted " + str(cur.rowcount) + " with invalid distrito")
remove_invalid_distrito(conn)
def print_stats(conn):
df = pd.read_sql("select fecha_resultado, count(fecha_resultado) as count from casos_positivos where metododx = 'PCR' group by fecha_resultado", conn)
df.plot.bar(x='fecha_resultado', y='count', figsize=(40,10), title='Casos positivos por PCR')
df = pd.read_sql("select fecha_resultado, count(fecha_resultado) as count from casos_positivos group by fecha_resultado", conn)
df.plot.bar(x='fecha_resultado', y='count', figsize=(40,10), title='Casos positivos por PCR y PR')
print_stats(conn)
# georeference casos
def populate_geometry_and_census_info(conn):
# cleanup data (tons of mispelling)
cur = conn.cursor()
print("Cleaning data...")
# Making names in INS match INEI data
# no entiendo como el nombre de este distrito en el INEI tiene un underscore ¯\_(ツ)_/¯
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'ANCO_HUALLO' WHERE distrito = 'ANCO HUALLO'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'ANCO HUALLO'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'SAN FRANCISCO DE ASIS DE YARUSYACAN' WHERE distrito = 'SAN FCO DE ASIS DE YARUSYACAN'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'SAN FCO DE ASIS DE YARUSYACAN'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'HUAY-HUAY' WHERE distrito = 'HUAY HUAY'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'HUAY HUAY'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET provincia = 'ANTONIO RAYMONDI' WHERE provincia = 'ANTONIO RAIMONDI' AND departamento = 'ANCASH'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'ANTONIO RAIMONDI'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'ANDRES AVELINO CACERES DORREGARAY' WHERE distrito = 'ANDRES AVELINO CACERES D.'"+\
" AND departamento = 'AYACUCHO'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'ANDRES AVELINO CACERES D.'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'CARMEN DE LA LEGUA REYNOSO' WHERE distrito = 'CARMEN DE LA LEGUA-REYNOSO'"+\
" AND departamento = 'CALLAO'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'CARMEN DE LA LEGUA-REYNOSO'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'NASCA' WHERE distrito = 'NAZCA'"+\
" AND departamento = 'ICA'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'NAZCA (distrito)'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET provincia = 'NASCA' WHERE provincia = 'NAZCA'"+\
" AND departamento = 'ICA'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'NAZCA (provincia)'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'SAN PEDRO DE PUTINA PUNCO' WHERE distrito = 'SAN PEDRO DE PUTINA PUNCU'"+\
" AND departamento = 'PUNO'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'SAN PEDRO DE PUTINA PUNCU'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'CORONEL GREGORIO ALBARRACIN LANCHIPA' WHERE distrito = 'CORONEL GREGORIO ALBARRACIN L.'"+\
" AND departamento = 'TACNA'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'COREONEL GREGORIO ALBARRICIN'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'ESTIQUE-PAMPA' WHERE distrito = 'ESTIQUE PAMPA'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'ESTIQUE PAMPA'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET departamento = 'LIMA' WHERE departamento = 'LIMA REGION'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'LIMA REGION'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'SAN FRANCISCO DEL YESO' WHERE distrito = 'SAN FRANCISCO DE YESO'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'SAN FRANCISCO DE YESO'")
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS +\
" SET distrito = 'QUITO-ARMA' WHERE distrito = 'QUITO ARMA'"
cur.execute(sql)
print("Cleaned " + str(cur.rowcount) + " with 'QUITO ARMA'")
print("Georeferencing data...")
# use census to georeference casos and load census data
sql = "UPDATE " + DESTINATION_CASOS_POSITIVOS + " casos " +\
"SET wkb_geometry = ST_PointOnSurface(censo.shape), "+\
"shape_district = censo.shape, " +\
"ubigeo = censo.ubigeo, " +\
"pobreza = censo.pobreza, " +\
"poblacion = censo.p_001 " +\
" FROM " + STATIC_CENSUS_TABLE_NAME + " censo WHERE casos.departamento = censo.nombdep AND "+\
"casos.provincia = censo.nombprov AND casos.distrito = censo.nombdist"
cur.execute(sql)
print("Verifying georeferenced data")
# find if any records where not referenced
sql = "SELECT count(*) FROM "+ DESTINATION_CASOS_POSITIVOS + " WHERE wkb_geometry is null"
cur.execute(sql)
records_not_referenced = int(cur.fetchall()[0][0])
if records_not_referenced > 0:
print("Records not georeferenced: " + str(records_not_referenced))
exit(1)
else:
print("All records georeferenced succesfully")
populate_geometry_and_census_info(conn)
end_time = datetime.now()
print('Duration so far: {}'.format(end_time - start_time))
postprocess_file = open('process.sql', 'r')
cur = conn.cursor()
cur.execute(postprocess_file.read())
postprocess_file = None
cur = None
end_time = datetime.now()
print('Duration total after queries: {}'.format(end_time - start_time))
def vacuum():
old_isolation_level = conn.isolation_level
conn.set_isolation_level(0)
query = "VACUUM ANALYZE"
cur = conn.cursor()
cur.execute(query)
conn.set_isolation_level(old_isolation_level)
vacuum()
end_time = datetime.now()
print('Duration total after queries: {}'.format(end_time - start_time))
#creo un folder llamado ./data/inei y pongo esto allí
https://www.dropbox.com/t/NdMoLpvp0kUBl1ZY
DROP TABLE result_table;
CREATE TABLE result_table as (
SELECT weekly, shape_district as wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana,
ubigeo, pobreza, poblacion, ((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, shape_district,
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion
FROM casos_positivos
GROUP BY fecha_resultado, shape_district, ubigeo, pobreza, poblacion
ORDER BY fecha_resultado) as x
GROUP BY weekly, shape_district, ubigeo, pobreza, poblacion
ORDER BY weekly);
ALTER TABLE result_table ADD COLUMN ogc_fid SERIAL PRIMARY KEY;
ALTER TABLE result_table ADD COLUMN departamento VARCHAR;
ALTER TABLE result_table ADD COLUMN provincia VARCHAR;
ALTER TABLE result_table ADD COLUMN distrito VARCHAR;
UPDATE result_table as t set departamento = c.nombdep, provincia = c.nombprov, distrito = nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo;
CREATE INDEX result_table_idx on result_table USING GIST(wkb_geometry);
DROP TABLE result_table_point;
CREATE TABLE result_table_point as (
SELECT weekly, wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana,
ubigeo, pobreza, poblacion, ((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, wkb_geometry,
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion
FROM casos_positivos
GROUP BY fecha_resultado, wkb_geometry, ubigeo, pobreza, poblacion
ORDER BY fecha_resultado) as x
GROUP BY weekly, wkb_geometry, ubigeo, pobreza, poblacion
ORDER BY weekly);
ALTER TABLE result_table_point ADD COLUMN ogc_fid SERIAL PRIMARY KEY;
ALTER TABLE result_table_point ADD COLUMN departamento VARCHAR;
ALTER TABLE result_table_point ADD COLUMN provincia VARCHAR;
ALTER TABLE result_table_point ADD COLUMN distrito VARCHAR;
UPDATE result_table_point as t set departamento = c.nombdep, provincia = c.nombprov, distrito = nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo;
CREATE INDEX result_table_point_idx on result_table_point USING GIST(wkb_geometry);
DROP TABLE result_table_pcr;
CREATE TABLE result_table_pcr as (
SELECT weekly, wkb_geometry, round(sum(resultado_count)::numeric,2) as casos_por_semana,
ubigeo, pobreza, poblacion, ROUND((sum(resultado_count)/poblacion) * 100000) as casos_por_poblacion
FROM ( SELECT date_trunc('week', fecha_resultado) as weekly, wkb_geometry,
count(fecha_resultado) as resultado_count,ubigeo, pobreza, poblacion
FROM casos_positivos
WHERE metododx = 'PCR'
GROUP BY fecha_resultado, wkb_geometry, ubigeo, pobreza, poblacion
ORDER BY fecha_resultado) as x
GROUP BY weekly, wkb_geometry, ubigeo, pobreza, poblacion
ORDER BY weekly);
ALTER TABLE result_table_pcr ADD COLUMN ogc_fid SERIAL PRIMARY KEY;
ALTER TABLE result_table_pcr ADD COLUMN departamento VARCHAR;
ALTER TABLE result_table_pcr ADD COLUMN provincia VARCHAR;
ALTER TABLE result_table_pcr ADD COLUMN distrito VARCHAR;
UPDATE result_table_pcr as t set departamento = c.nombdep, provincia = c.nombprov, distrito = c.nombdist FROM censo_peru_distrito as c WHERE c.ubigeo = t.ubigeo;
ALTER TABLE result_table ADD COLUMN casos_acumulados NUMERIC;
UPDATE result_table as r set casos_acumulados = s.casos_acumulados FROM (select ubigeo, sum(casos_por_semana) as casos_acumulados from result_table group by ubigeo) as s where r.ubigeo = s.ubigeo;
jupyterlab
beautifulsoup4
pandas
psycopg2
matplotlib
#jupyterlab-lsp
def column_exists(conn, schema_name, table_name, column_name):
cur = conn.cursor()
sql = "SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='" + \
schema_name + "' AND table_name = '" + table_name + "' AND column_name = '" + \
column_name + "'"
+ AND table_name='casos_positivos_in' AND column_name='fecha');'
cur.execute('ALTER TABLE casos_positivos_in ADD COLUMN fecha date')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment