Created
December 31, 2022 16:57
-
-
Save ayharano/d9ca343c5e7daeedc1d069f227be30eb to your computer and use it in GitHub Desktop.
rj_campos_dos_goytacazes.py with tentative end_date search
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import calendar | |
import re | |
from datetime import date, timedelta | |
from string import punctuation | |
import dateparser | |
from scrapy import Request | |
from gazette.items import Gazette | |
from gazette.spiders.base import BaseGazetteSpider | |
EDITION_NUMBER_RE = re.compile(r"Edição\s+(?:Extra\s+)?-?\s*(\d+)") | |
REGULAR_FULL_NOMINAL_DATE_RE = re.compile( | |
r"\s+" | |
r"(\d{1,2})(?# day)" | |
r"\s+d?e?\s*" | |
rf"([^\d\s{punctuation}]+)(?# nominal month in pt)" | |
r"\s+d?e?\s*" | |
r"(\d{4})(?# year)", | |
flags=re.IGNORECASE, | |
) | |
MONTH_YEAR_NOMINAL_DATE_RE = re.compile( | |
r"Oficial\s+de\s*" | |
rf"([^\d\s{punctuation}]+)(?# nominal month in pt)" | |
r"\s+d?e?\s*" | |
r"(\d{4})(?# year)", | |
flags=re.IGNORECASE, | |
) | |
class DetermineEndDatePageMixin: | |
"""Collection of attributes and methods to determine the end_date page""" | |
BASE_URL = ( | |
"https://www.campos.rj.gov.br/diario-oficial.php" | |
"?PGpagina={PAGE_NUMBER}&PGporPagina=15" | |
) | |
# the current gazette system only allows at most this number of rows per page, | |
# even when explicitly requesting more than that | |
MAX_ROWS_PER_PAGE = 15 | |
MINIMUM_ALLOWED_PAGE_NUMBER = 1 | |
def calculate_tentative_page_number_associated_with_end_date(self): | |
"""Determine the page number that the end_date gazette might be at. | |
Facts for the design of this method: | |
- The first page of the pagination contains the most recent gazette. | |
- We consider most Saturday and Sunday days have no gazette. | |
Exception example: | |
Diário Oficial Eletrônico de 14 de Agosto de 2021 - Edição Extra | |
- Even if the number of rows for the other days may vary from zero to | |
more than one, we consider that non-Saturday and non-Sunday days | |
will have one gazette | |
- Considering the potential variation of established conditions, | |
such as not having a gazette or having multiple rows for the same day, | |
we tentatively set that the target end_date gazette might be available on | |
the calculated page number or one before that. | |
This method adopts the following heuristic: we calculate the number of | |
non-Saturday and non-Sunday from the day this method runs until | |
the target end_date and perform an integer division of the estimated number of | |
days by the maximum number of rows on a page, and the result is | |
the chosen page number. | |
We only replace the calculated number when it is less than one: | |
for that case, we replace it with 1, as the page numbering begins at 1. | |
It returns a non-zero positive int. | |
""" | |
today = date.today() | |
if today <= self.end_date: | |
return self.MINIMUM_ALLOWED_PAGE_NUMBER | |
non_saturday_nor_sunday_day_count = 0 | |
current_day = self.end_date | |
one_day_timedelta = timedelta(days=1) | |
saturday_and_sunday_set = { | |
6, # Saturday | |
7, # Sunday | |
} | |
while current_day <= today: | |
if current_day.isoweekday() not in saturday_and_sunday_set: | |
non_saturday_nor_sunday_day_count += 1 | |
current_day = current_day + one_day_timedelta | |
self.logger.info( | |
f"Number of non-Saturday and non-Sunday days from {self.end_date} to" | |
f" {today}, inclusive: {non_saturday_nor_sunday_day_count}" | |
) | |
tentative_page_number = ( | |
non_saturday_nor_sunday_day_count // self.MAX_ROWS_PER_PAGE | |
) | |
if tentative_page_number < self.MINIMUM_ALLOWED_PAGE_NUMBER: | |
tentative_page_number = self.MINIMUM_ALLOWED_PAGE_NUMBER | |
self.logger.info( | |
f"Tentative page number for {self.end_date} has defaulted to" | |
f" {self.MINIMUM_ALLOWED_PAGE_NUMBER}" | |
) | |
else: | |
self.logger.info( | |
f"Tentative page number for {self.end_date} calculated as" | |
f" {tentative_page_number}" | |
) | |
return tentative_page_number | |
def extract_date_from_regular_full_nominal_date(self, match_): | |
textual_date = f"{match_.group(1)} de {match_.group(2)} de {match_.group(3)}" | |
gazette_date = dateparser.parse(textual_date, languages=["pt"]).date() | |
return gazette_date | |
def extract_date_from_month_year_nominal_date(self, match_): | |
# To avoid any issues for the date conversion, we do a safe replacement to | |
# initially consider as the first day of the month | |
textual_date = f"01 de {match_.group(1)} de {match_.group(2)}" | |
gazette_date = dateparser.parse(textual_date, languages=["pt"]).date() | |
# As this case is a collection of gazettes for the full month, | |
# we consider the gazette date as the last day of that month | |
last_day_of_the_month = calendar.monthrange( | |
year=gazette_date.year, month=gazette_date.month | |
)[1] | |
gazette_date = gazette_date.replace(day=last_day_of_the_month) | |
return gazette_date | |
def extract_date_from_gazette_text(self, gazette_text): | |
if not gazette_text: | |
return None | |
text = ( | |
gazette_text | |
# The extra edition for August 28th, 2018 has a typo in the month name. | |
.replace("Agosoto", "Agosto") | |
# The edition for December 17th, 2012 has a typo in the month name. | |
.replace("Dezembrbo", "Dezembro") | |
) | |
if match_ := REGULAR_FULL_NOMINAL_DATE_RE.search(text): | |
return self.extract_date_from_regular_full_nominal_date(match_) | |
# From October 2012 to October 2013, it has a single row per month | |
# The provided data is a rar extension file and some of them are missing | |
if match_ := MONTH_YEAR_NOMINAL_DATE_RE.search(text): | |
return self.extract_date_from_month_year_nominal_date(match_) | |
self.logger.warning(f"No date could be extracted from '{text}'") | |
return None | |
def validate_date_condition_over_rows( | |
self, | |
remaining_rows_for_current_page, | |
condition, | |
): | |
"""Returns the found gazette date once the provided condition is valid. | |
In case it is not found across the iteration, returns None. | |
""" | |
gazette_date = None | |
for row_element in remaining_rows_for_current_page: | |
gazette_text = row_element.css("h4::text").get("").strip() | |
if not gazette_text: | |
continue | |
gazette_date = self.extract_date_from_gazette_text(gazette_text) | |
if not gazette_date: | |
continue | |
if condition(gazette_date): | |
return gazette_date | |
return gazette_date | |
def find_first_valid_date_in_rows(self, remaining_rows_for_current_page): | |
return self.validate_date_condition_over_rows( | |
remaining_rows_for_current_page, condition=lambda date_: date_ is not None | |
) | |
def find_end_date_or_earlier_in_rows(self, remaining_rows_for_current_page): | |
return self.validate_date_condition_over_rows( | |
remaining_rows_for_current_page, | |
condition=lambda date_: date_ <= self.end_date, | |
) | |
def find_end_date_page( | |
self, | |
response, | |
current_page_number, | |
search_towards_the_past, | |
): | |
"""Determine the page for end_date. | |
In most cases, the tentative_page_number_associated_with_end_date strategy is | |
a conservative approach, as at 2022-10-05 there are consistently one or more gazettes | |
per non-Saturday and non-Sunday weekdays. It means that to determine the page for | |
end_date we will have to browse greater page numbers to find the actual page associated with end_date. | |
However, we might face a period when, for any reason, the gazettes were not published as expected. | |
A fallback strategy consists in browsing pages with lower page number than | |
the tentative_page_number_associated_with_end_date value. | |
We control the direction of the search by using the `search_towards_the_past` parameter: | |
- None: we still need to determine which way we need to find the end_date page | |
- False: tentative_page_number_associated_with_end_date strategy failed due to having more | |
days without any published gazettes. | |
- True: tentative_page_number_associated_with_end_date strategy worked, and we need to find | |
end_date in pages associated with earlier gazettes. | |
""" | |
was_end_date_page_found = False | |
remaining_rows_for_current_page = iter(response.css("ul.ul-licitacoes li")) | |
gazette_date = self.find_first_valid_date_in_rows( | |
remaining_rows_for_current_page | |
) | |
if not gazette_date: | |
raise ValueError(f"No valid dates were found for this page: {response.url}") | |
if not search_towards_the_past: | |
# This case is valid for both when search_towards_the_past is None and when it is False | |
if gazette_date < self.end_date: | |
# First valid date is earlier than end_date | |
if current_page_number <= self.MINIMUM_ALLOWED_PAGE_NUMBER: | |
# We already reached the most recent page, so we will start triaging the data | |
was_end_date_page_found = True | |
self.logger.info("Page with the most recent gazettes reached") | |
else: | |
# We need to retrieve pages associated with newer gazettes | |
search_towards_the_past = False | |
else: | |
search_towards_the_past = True | |
if ( | |
search_towards_the_past | |
): # This condition should not be joined with the one above | |
was_end_date_page_found = gazette_date <= self.end_date | |
if not was_end_date_page_found: | |
gazette_date = self.find_end_date_or_earlier_in_rows( | |
remaining_rows_for_current_page | |
) | |
was_end_date_page_found = gazette_date <= self.end_date | |
if was_end_date_page_found: | |
# As the page was found, we finally start triaging the data | |
self.logger.info( | |
f"{gazette_date.isoformat()} was the closest date to end_date" | |
f" {self.end_date.isoformat()}, and it was found on page" | |
f" {current_page_number}. Starting gazette collection." | |
) | |
for from_end_date_backward in self.triage_data_per_page(response): | |
yield from_end_date_backward | |
else: | |
assert search_towards_the_past is not None | |
if search_towards_the_past: | |
next_call_page_number = current_page_number + 1 | |
self.logger.info( | |
f"{gazette_date.isoformat()} was the earliest date found on page" | |
f" {current_page_number}. Searching the closest date to end_date" | |
f" {self.end_date} on page {next_call_page_number}." | |
) | |
else: | |
next_call_page_number = current_page_number - 1 | |
self.logger.info( | |
f"{gazette_date.isoformat()} was the most recent date found on page" | |
f" {current_page_number}. Searching the closest date to end_date" | |
f" {self.end_date} on page {next_call_page_number}." | |
) | |
yield Request( | |
url=self.BASE_URL.format(PAGE_NUMBER=next_call_page_number), | |
callback=self.find_end_date_page, | |
cb_kwargs={ | |
"current_page_number": next_call_page_number, | |
"search_towards_the_past": search_towards_the_past, | |
}, | |
) | |
class RjCamposDosGoytacazesSpider(DetermineEndDatePageMixin, BaseGazetteSpider): | |
TERRITORY_ID = "3301009" | |
allowed_domains = ["www.campos.rj.gov.br"] | |
name = "rj_campos_dos_goytacazes" | |
start_date = date(2010, 6, 10) | |
# November 17th, 2017 was the date of the last Diário Oficial gazette and | |
# also the date of the first Diário Oficial Eletrônico gazette | |
def __init__(self, *args, **kwargs): | |
super(RjCamposDosGoytacazesSpider, self).__init__(*args, **kwargs) | |
self.current_date = None | |
self.current_edition_number = "" | |
self.row_texts = set() | |
# Given that there are some gazettes with multiple rows and they can be | |
# spread across subsequent pages, we temporarily store data, which is consumed | |
# once we confirm a differente date found for the next row in the page. | |
# | |
# That storage is a dict in which the key a bool value for is_extra_edition | |
# and each dict entries' value is a list of URLs associated to that gazette date | |
# and its is_extra_edition value. | |
self.collected_data_for_current_date: dict[bool, list[str]] = {} | |
def start_requests(self): | |
tentative_page_number_associated_with_end_date = ( | |
self.calculate_tentative_page_number_associated_with_end_date() | |
) | |
yield Request( | |
url=self.BASE_URL.format( | |
PAGE_NUMBER=tentative_page_number_associated_with_end_date | |
), | |
callback=self.find_end_date_page, | |
cb_kwargs={ | |
"current_page_number": tentative_page_number_associated_with_end_date, | |
"search_towards_the_past": None, | |
}, | |
dont_filter=True, # the page may have already been requested when determining the end_date page | |
) | |
def triage_data_per_row(self, gazette_text): | |
"""Triage gazette data for a gazette that is from November 17th 2017 or earlier. | |
It returns a 3-tuple: | |
the extracted gazette date, | |
whether it is an extra edition, and | |
the edition number when applicable. | |
""" | |
gazette_date = None | |
is_extra_edition = ( | |
gazette_text.startswith("Suplemento") or "Extra" in gazette_text | |
) | |
edition_number = "" | |
if not gazette_text: | |
return gazette_date, is_extra_edition, edition_number | |
gazette_date = self.extract_date_from_gazette_text(gazette_text) | |
if ( | |
not gazette_date | |
or gazette_date < self.start_date | |
or self.end_date < gazette_date | |
): | |
return gazette_date, is_extra_edition, edition_number | |
edition_number_match = EDITION_NUMBER_RE.search(gazette_text) | |
if edition_number_match: | |
edition_number = edition_number_match.group(1).strip() | |
return gazette_date, is_extra_edition, edition_number | |
def instantiate_gazettes_and_reset_stored_data(self): | |
"""Instantiates Gazette from the most recent data and resets the state.""" | |
if not self.current_date: | |
return [] | |
gazettes = [ | |
Gazette( | |
date=self.current_date, | |
edition_number=str(self.current_edition_number), | |
file_urls=file_urls, | |
is_extra_edition=is_extra_edition, | |
power="executive", | |
) | |
for ( | |
is_extra_edition, | |
file_urls, | |
) in self.collected_data_for_current_date.items() | |
] | |
self.current_date = None | |
self.current_edition_number = "" | |
self.collected_data_for_current_date = {} | |
return gazettes | |
def triage_data_per_page(self, response): | |
"""Triage gazette data from a page row. | |
Once we notice that either the edition_number or the gazette_date changed | |
between rows, we collect the gazettes. | |
Otherwise, we triage from the next page. | |
""" | |
is_gazette_date_before_start_date = False | |
for row_element in response.css("ul.ul-licitacoes li"): | |
gazette_text = row_element.css("h4::text").get("").strip() | |
file_url = row_element.css("a::attr(href)").get().strip() | |
if not gazette_text or not file_url: | |
continue | |
if gazette_text in self.row_texts: | |
self.logger.info( | |
f"Textual value '{gazette_text}' was already processed earlier" | |
) | |
continue | |
gazette_date, is_extra_edition, edition_number = self.triage_data_per_row( | |
gazette_text | |
) | |
if not gazette_date: | |
continue | |
if gazette_date < self.start_date: | |
is_gazette_date_before_start_date = True | |
break | |
if ( | |
self.current_edition_number != edition_number | |
or self.current_date != gazette_date | |
): | |
yield from self.instantiate_gazettes_and_reset_stored_data() | |
self.row_texts.add(gazette_text) | |
self.current_edition_number = edition_number | |
self.current_date = gazette_date | |
self.collected_data_for_current_date.setdefault( | |
is_extra_edition, [] | |
).append(file_url) | |
next_url = ( | |
response.css(".pagination") | |
.xpath("//a[contains(text(), 'Proxima')]/@href") | |
.get() | |
) | |
if is_gazette_date_before_start_date or not next_url: | |
# Collect the gazettes using the triaged data | |
# | |
# Regarding the `not next_url` condition, due to the Gazette instantiation | |
# construct of this spider, this is a corner case when we collect without | |
# an explicit start_date, meaning that is_gazette_date_before_start_date | |
# wouldn't be set to True | |
yield from self.instantiate_gazettes_and_reset_stored_data() | |
else: | |
# Keep triaging data | |
yield Request( | |
response.urljoin(next_url), | |
callback=self.triage_data_per_page, | |
dont_filter=True, # the page may have already been requested when determining the end_date page | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment