Skip to content

Instantly share code, notes, and snippets.

@arieltorti
Last active November 8, 2019 16:17
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arieltorti/bcc435e517a5f0fde7f4104980f07bb8 to your computer and use it in GitHub Desktop.
Save arieltorti/bcc435e517a5f0fde7f4104980f07bb8 to your computer and use it in GitHub Desktop.
PASO 2019 Scraper
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: Ariel Torti
# @GitHub: github.com/maks500
import argparse
import logging
import os
import shutil
import sys
import time
import itertools
from threading import Thread, Lock
import requests
from queue import Queue
logger = logging.getLogger(__name__)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
MAX_RETRIES = 5
def _client_blocked():
return requests.get('https://resultados2019.gob.ar').status_code == 403
class PasoScrapper:
"""The election process is divided intro 6 hierarchical levels.
1 - Country. Not needed here.
2 - Province.
3 - Department.
4 - Circuit.
5 - Region.
6 - Precinct.
We can get the first 4 levels from a single file. From then on we have
to request info for each region inside each circuit, and each precinct inside
each region.
"""
PROVINCE, DEPARTMENT, CIRCUIT, REGION, PRECINCT = range(2, 7)
STARTING_LEVEL = PROVINCE
LAST_LEVEL = CIRCUIT
LEVELS_TO_CC = [0, 2, 5, 11]
def __init__(self, threads, output, flat=False, info_thread=False, corrupt_file_threshold=150000):
self.result_name = output
self.results_dir = os.path.join(BASE_DIR, self.result_name)
self.flat = flat
self.sites_base_url = 'https://resultados2019.gob.ar/assets/data/{}'
self.image_base_url = 'https://resultados2019.gob.ar/opt/jboss/rct/tally/pages/{}/{}.png'
self.initial_json = None
self.cc_to_folder = {}
self.region_queue = Queue()
self.telegram_queue = Queue()
self.THREADS = threads
self.failed = 0
self.failed_lock = Lock()
self.info_thread = info_thread
self.downloaded = 0
self.downloaded_lock = Lock()
self.already_downloaded = 0
self.already_downloaded_lock = Lock()
# Lets be a little conservative here, better to have
# a false positive than a false negative.
self.CORRUPT_FILE_SIZE_THRESHOLD = corrupt_file_threshold
def requests_information_thread_logger(self):
SLEEP_TIME = 60*5
old_failed = old_downloaded = old_already_downloaded = 0
while True:
print("[x] {} failed requests".format(self.failed))
print("[x] {} succeded requests".format(self.downloaded))
print("[x] {} skipped images".format(self.already_downloaded))
failed_diff = self.failed - old_failed
print("[x] {} New failed requests."
" {:.2f} p/min".format(failed_diff, failed_diff/SLEEP_TIME*60))
downloaded_diff = self.downloaded - old_downloaded
print("[x] {} New downloaded images."
" {:.2f} p/min".format(downloaded_diff, downloaded_diff/SLEEP_TIME*60))
already_downloaded_diff = self.already_downloaded - old_already_downloaded
print("[x] {} New skipped images."
" {:.2f} p/min".format(already_downloaded_diff, already_downloaded_diff/SLEEP_TIME*60))
# Bear in mind that as we are not using any locks here the values may be
# slightly wrong.
old_failed = self.failed
old_downloaded = self.downloaded
old_already_downloaded = self.already_downloaded
time.sleep(SLEEP_TIME)
def start(self):
if self.info_thread:
t = Thread(target=self.requests_information_thread_logger)
t.daemon = True
t.start()
# This json contains province, department and circuit data.
print("[x] Fetching initial data")
response = self.handle_request('https://resultados2019.gob.ar/assets/data/regions.json')
self.initial_json = response.json()
self.create_result_dir()
self._start()
def create_result_dir(self):
logger.info("Creating results dir...")
if os.path.exists(self.result_name):
response = None
while not response:
print("A folder named {} already exists, want to remove it ? (Y/n)".format(self.result_name), end=' ')
response = input()
if response[0].lower() == 'y':
shutil.rmtree(self.result_name)
os.makedirs(self.result_name, exist_ok=True)
os.chdir(self.result_name)
def _start(self):
if not self.flat:
self.create_directory_structure()
print("[x] Requesting regions...")
self.get_regions()
def create_directory_structure(self):
# Create province, department and circuit folders.
logger.info("Creating directory structure")
for level in range(self.STARTING_LEVEL, self.LAST_LEVEL + 1):
self.create_directories([x for x in self.initial_json if x['l'] == level], level)
def create_directories(self, iterable, level):
for node in iterable:
os.chdir(self.get_folder_from_cc(node['cc'], level))
name = self.clean(node['n'].capitalize())
self.cc_to_folder[node['cc']] = os.path.join(os.getcwd(), name)
os.makedirs(name, exist_ok=True)
def get_folder_from_cc(self, cc, level):
if self.flat:
return self.results_dir
bits = cc[:self.LEVELS_TO_CC[level - 2]]
if bits:
return os.path.join(self.results_dir, self.cc_to_folder[bits])
return '.'
def get_regions(self):
# In order to get the regions we need to gather info from circuits
for i in range(self.THREADS):
t = Thread(target=self.circuit_worker)
t.daemon = True
t.start()
t = Thread(target=self.telegram_worker)
t.daemon = True
t.start()
try:
for x in self.initial_json:
if x['l'] == self.CIRCUIT:
tuple_ = (x, 0)
self.region_queue.put(tuple_)
self.region_queue.join()
self.telegram_queue.join()
except KeyboardInterrupt:
sys.exit(1)
def circuit_worker(self):
while True:
circuit, retries = self.region_queue.get()
try:
circuit_id = circuit['cc']
# Each of this is the site ID, each site has many telegrams ID.
sites = circuit['chd']
for site in sites:
pre_cc = site[:-3] or '0'
url = "precincts/{}/s{}.json".format(pre_cc, site)
self.parse_site(self.sites_base_url.format(url), circuit_id)
self.region_queue.task_done()
except Exception:
logger.debug("Thread exception ocurred with argument {}".format(circuit), exc_info=1)
# Put item back into the queue for retrying
if retries <= 10:
retries += 1
self.region_queue.put((circuit, retries))
else:
logger.error("Ignoring {} failed to many times.".format(circuit))
def parse_site(self, url, circuit_cc):
response = self.handle_request(url)
if response.status_code != 200:
if self.info_thread:
self.failed_lock.acquire()
self.failed += 1
self.failed_lock.release()
logger.error("{} returned {}".format(response.url, response.status_code))
else:
response_json = response.json()
for obj in response_json:
try:
self.telegram_queue.put((obj['cc'], circuit_cc, 0)) # This is the image ID.
except KeyboardInterrupt:
sys.exit(1)
def telegram_worker(self):
while True:
image, circuit_cc, retries = self.telegram_queue.get()
try:
for i in itertools.count(1):
image_folder, image_path = self.create_image_path(image, circuit_cc, i)
if self.should_download_image(image_path):
logger.debug('Saving image {} in {}'.format(image, image_folder))
response = self.handle_request(self.image_base_url.format(image, i), stream=True)
# If we recieve 403, then the image doesn't exist and we stop searching for
# higher numbers.
if response.status_code == 403:
break
if response.status_code == 200:
with open(image_path, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
if self.info_thread:
self.downloaded_lock.acquire()
self.downloaded += 1
self.downloaded_lock.release()
else:
logger.error("There was an error downloading the image {}".format(image))
elif self.info_thread:
self.already_downloaded_lock.acquire()
self.already_downloaded += 1
self.already_downloaded_lock.release()
self.telegram_queue.task_done()
except Exception:
logger.debug("Thread exception ocurred with argument {}".format(image), exc_info=1)
if retries <= 10:
# Put item back into the queue for retrying
retries += 1
self.telegram_queue.put((image, circuit_cc, retries))
else:
logger.error("Ignoring {} failed to many times.".format(image))
def create_image_path(self, image, circuit_cc, index):
if self.flat:
folder = self.results_dir
path = os.path.join(folder, '{}-{}.png'.format(image, index))
return folder, path
folder = os.path.join(self.cc_to_folder[circuit_cc], image)
path = os.path.join(folder, "{}.png".format(index))
os.makedirs(folder, exist_ok=True)
return folder, path
def should_download_image(self, image_path):
try:
file_size = os.path.getsize(image_path)
except FileNotFoundError:
file_size = 0
return file_size < self.CORRUPT_FILE_SIZE_THRESHOLD
@staticmethod
def handle_request(url, **kwargs):
# Retry request 5 times if blocked, sleeping in the meantime
logger.debug("Requesting url: {}".format(url))
for _ in range(MAX_RETRIES):
try:
response = requests.get(url, **kwargs)
except requests.ConnectionError:
time.sleep(2)
continue
except Exception:
logger.debug("Exception ocurred while requesting url {}".format(url), exc_info=1)
if response.status_code == 200:
return response
if response.status_code == 403 and _client_blocked():
# Blocked by Cloudflare, sleep for 4 minutes before retrying.
logger.info("Request {} blocked, sleeping and retrying...".format(url))
time.sleep(60*4)
continue
return response
# What to return here ?
raise requests.ConnectionError
@staticmethod
def clean(name):
return name.replace('/', '-')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Elecciones PASO 2019 Scrapper.')
parser.add_argument('--threads', type=int, default=10,
help='Number of threads.')
parser.add_argument('--cft', type=int, default=150000, dest='corrupt_file_threshold',
help='Corrupt File Threshold. If an image file is smaller than this\
it will be redownloaded.')
parser.add_argument('--output', '-o', default='results',
help='Output folder.')
parser.add_argument('--info', action='store_true', dest='info_thread',
help='Periodically show requests information.')
parser.add_argument('--flat', '-f', action='store_true',
help='Save all the images in the result folder, no subfolders.')
parser.add_argument('--verbose', '-v', action='count', default=1,
help='Verbosity level.')
args = parser.parse_args()
# Logger configuration
args = vars(args)
verbosity = args.pop('verbose')
if verbosity >= 3:
logger.setLevel(logging.DEBUG)
elif verbosity >= 2:
logger.setLevel(logging.INFO)
PasoScrapper(**args).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment