Skip to content

Instantly share code, notes, and snippets.

@athossampayo
Created March 29, 2022 18:33
Show Gist options
  • Save athossampayo/0ceca5061bff3686c27190e7d390b1bd to your computer and use it in GitHub Desktop.
Save athossampayo/0ceca5061bff3686c27190e7d390b1bd to your computer and use it in GitHub Desktop.
Instructions to solve the old code to download CNES dataset and be able to convert it from proprietary DBC extension.

How to download and process br_ms_cnes data UPDATED

  1. Python script to download all DBC files. Might be better to edit to use ftplib in place of wget.

    from datetime import date, timedelta
    
    import wget
    import ftplib
    
    def verify_ftp_path(additional_path=""):
        #Verifica arquivos no caminho do folder correspondente
        additional_path = additional_path + "/"
        with ftplib.FTP("ftp.datasus.gov.br") as ftp:
            ftp.login()
            ftp.dir(f'dissemin/publicos/CNES/200508_/Dados/{additional_path}')
    
    #define os parâmetros para o link de download
    tipos = ['DC','EE','EF','EP','EQ','GM','HB','IN','LT','PF','RC','SR','ST']
    
    ufs = ['RJ','SP','ES','MG','PR','SC','RS','MS','GO','AC',
        'AL','AP','AM','BA','CE','DF','MA','MT','PA','PB',
        'PE','PI','RN','RO','RR','SE','TO']
    
    #to download only the actual month, but is possible to download every year and month.
    today = date.today() 
    
    def download():
       #Laço para download
       for tipo in tipos:
            for uf in ufs:
                try:
                    link = "ftp://ftp.datasus.gov.br/dissemin/publicos/CNES/200508_/Dados/{}/{}{}{}.dbc".format(tipo,tipo,uf,today.strftime("%y%m"))
                    wget.download(link)
                    print(link)
                except: pass
  2. Build this C code (this fork is the working version) and run it to convert all files from DBC to DBF extension.

    1. You can run it with a bash script as above:

      for file in *.dbc; do ./blast-dbf $file "${file//dbc/dbf}"; done
    2. Or import the C functions on python as PySUS does here.

  3. Create csv directory (optional?)

    1. bash
    mkdir csv
    1. python
    import os
    if not os.path.exists(path):
        os.mkdir(path)
  4. Adapt the simpledbf python lib to accept DBF version 3. Basically you should create a class Dbf3 that inherits Dbf5, but you should add an new condition to the _get_recs method.

    The linked issue says to read one byte before opening, but this did not work with CNES files, so it's not included on the code bellow.

    import struct
    
    from simpledbf import Dbf5
    
    class Dbf3(Dbf5):
       def __init__(self, dbf, codec='utf-8'):
           super().__init__(dbf, codec)
       
       def _get_recs(self, chunk=None):
           '''Generator that returns individual records.
           Parameters
           ----------
           chunk : int, optional
               Number of records to return as a single chunk. Default 'None',
               which uses all records.
           '''
           if chunk == None:
               chunk = self.numrec
    
           for i in range(chunk):
               # Extract a single record
               record = struct.unpack(self.fmt, self.f.read(self.fmtsiz))
               # If delete byte is not a space, record was deleted so skip
               if record[0] != b' ': 
                   continue  
               
               # Save the column types for later
               self._dtypes = {}
               result = []
               for idx, value in enumerate(record):
                   name, typ, size = self.fields[idx]
                   if name == 'DeletionFlag':
                       continue
    
                   # String (character) types, remove excess white space
                   if typ == "C":
                       if name not in self._dtypes:
                           self._dtypes[name] = "str"
                       value = value.strip()
                       # Convert empty strings to NaN
                       if value == b'':
                           value = self._na
                       else:
                           value = value.decode(self._enc)
                           # Escape quoted characters
                           if self._esc:
                               value = value.replace('"', self._esc + '"')
    
                   # Numeric type. Stored as string
                   elif typ == "N":
                       # A decimal should indicate a float
                       if b'.' in value:
                           if name not in self._dtypes:
                               self._dtypes[name] = "float"
                           value = float(value)
                       # No decimal, probably an integer, but if that fails,
                       # probably NaN
                       else:
                           try:
                               value = int(value)
                               if name not in self._dtypes:
                                   self._dtypes[name] = "int"
                           except:
                               # I changed this for SQL->Pandas conversion
                               # Otherwise floats were not showing up correctly
                               value = float('nan')
    
                   # Date stores as string "YYYYMMDD", convert to datetime
                   elif typ == 'D':
                       try:
                           y, m, d = int(value[:4]), int(value[4:6]), \
                                       int(value[6:8])
                           if name not in self._dtypes:
                               self._dtypes[name] = "date"
                       except:
                           value = self._na
                       else:
                           value = datetime.date(y, m, d)
    
                   # Booleans can have multiple entry values
                   elif typ == 'L':
                       if name not in self._dtypes:
                           self._dtypes[name] = "bool"
                       if value in b'TyTt':
                           value = True
                       elif value in b'NnFf':
                           value = False
                       # '?' indicates an empty value, convert this to NaN
                       else:
                           value = self._na
    
                   # Floating points are also stored as strings.
                   elif typ == 'F':
                       if name not in self._dtypes:
                           self._dtypes[name] = "float"
                       try:
                           value = float(value)
                       except:
                           value = float('nan')
                   elif typ == 'M':
                       value = self._na
    
                   else:
                       err = 'Column type "{}" not yet supported.'
                       raise ValueError(err.format(value))
    
                   result.append(value)
               yield result
  5. Convert every DBF file to CSV or adapt the script to convert to PARQUET.

    import os
    
    def convert_to_csv():
       for filename in os.listdir():
           if filename.endswith(".dbf"):
               dbf = Dbf3(filename, codec="iso-8859-1") #codec specific to this dataset 
               dbf.to_csv("csv/" + os.path.splitext(filename)[0] + ".csv")

Referências usadas:

Requerimentos:

Python

  • simpledbf
  • wget
  • ftplib

C

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment