Last active
July 1, 2017 11:40
-
-
Save felipeandres254/f8a27c4e01c03317d3cf0939357fa2eb to your computer and use it in GitHub Desktop.
Company info download from Colombian NIT. In collaboration with @cecabrera
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requests==2.2.1 | |
beautifulsoup4==4.6.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re, time, datetime, requests | |
from bs4 import BeautifulSoup as BeautifulSoup | |
URL = "http://superwas.supersociedades.gov.co/virtuales/buscar.do" | |
FIELDS = [ | |
"RAZON SOCIAL", "E-MAIL", "REPRESENTANTE LEGAL", "CIUDAD", "TELÉFONO 1", "ACTIVIDAD CIIU", "SITUACIÓN" | |
] | |
def clean_tag( data ): | |
if not data: | |
return "" | |
data = data.replace("\n", "") | |
data = re.sub(r"\s{2,}", " ", data) | |
return data.strip() | |
def download( nit ): | |
r = requests.post(URL, data={"nit":nit[:9]}).text | |
soup = BeautifulSoup(r, "html.parser") | |
if soup.title.string.startswith("Buscar"): | |
return None | |
SOCIEDAD = {} | |
for tag in soup.find_all(class_="titulo"): | |
if not tag.string: | |
continue | |
field = clean_tag(tag.string) | |
if field not in FIELDS or field in SOCIEDAD: | |
continue | |
SOCIEDAD[field] = clean_tag(tag.find_next().string) | |
time.sleep(1) | |
return [nit] + [(SOCIEDAD[key] if key in SOCIEDAD else "") for key in FIELDS] | |
def seg2hms( value ): | |
return str(datetime.timedelta(seconds=value)) | |
def print_info( current, progress, total, start, last_nit, last_data ): | |
percent = 100*current//total | |
ellapsed = int(time.time()) - start | |
estimated = int(ellapsed*(total-current)/current) if current!=0 else 0 | |
print("\033[1;1H" + " "*125) | |
print(" # de Sociedades / NITs {:d} / {:d}".format(progress, current).ljust(125, " ")) | |
print(" Tiempo estimado {}".format(seg2hms(estimated)).ljust(125, " ")) | |
print(" Progreso total {:d}% de {:d}".format(percent, total).ljust(125, " ")) | |
print(" "*125) | |
if last_data is not None: | |
print(" Última sociedad [{}] {}".format(last_nit, last_data[1][:100]).ljust(125, " ")) | |
## MAIN PROGRAM | |
if __name__!="__main__": | |
exit() | |
open("output.csv", "w").write("\"" + "\";\"".join(["NIT"] + FIELDS) + "\"\n") | |
lines = open("input.csv", "r").readlines() | |
with open("input.csv", "r") as f: | |
CURRENT, PROGRESS, TOTAL, START = 0, 0, len(lines), int(time.time()) | |
for nit in f: | |
data = download(nit.strip()) | |
if data is not None: | |
open("output.csv", "a").write("\"" + "\";\"".join(data) + "\"\n") | |
PROGRESS += 1 | |
CURRENT += 1 | |
print_info(CURRENT, PROGRESS, TOTAL, START, nit.strip()[:9], data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment