Skip to content

Instantly share code, notes, and snippets.

@ayharano
Created December 31, 2022 16:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ayharano/d9ca343c5e7daeedc1d069f227be30eb to your computer and use it in GitHub Desktop.
Save ayharano/d9ca343c5e7daeedc1d069f227be30eb to your computer and use it in GitHub Desktop.
rj_campos_dos_goytacazes.py with tentative end_date search
import calendar
import re
from datetime import date, timedelta
from string import punctuation
import dateparser
from scrapy import Request
from gazette.items import Gazette
from gazette.spiders.base import BaseGazetteSpider
EDITION_NUMBER_RE = re.compile(r"Edição\s+(?:Extra\s+)?-?\s*(\d+)")
REGULAR_FULL_NOMINAL_DATE_RE = re.compile(
r"\s+"
r"(\d{1,2})(?# day)"
r"\s+d?e?\s*"
rf"([^\d\s{punctuation}]+)(?# nominal month in pt)"
r"\s+d?e?\s*"
r"(\d{4})(?# year)",
flags=re.IGNORECASE,
)
MONTH_YEAR_NOMINAL_DATE_RE = re.compile(
r"Oficial\s+de\s*"
rf"([^\d\s{punctuation}]+)(?# nominal month in pt)"
r"\s+d?e?\s*"
r"(\d{4})(?# year)",
flags=re.IGNORECASE,
)
class DetermineEndDatePageMixin:
"""Collection of attributes and methods to determine the end_date page"""
BASE_URL = (
"https://www.campos.rj.gov.br/diario-oficial.php"
"?PGpagina={PAGE_NUMBER}&PGporPagina=15"
)
# the current gazette system only allows at most this number of rows per page,
# even when explicitly requesting more than that
MAX_ROWS_PER_PAGE = 15
MINIMUM_ALLOWED_PAGE_NUMBER = 1
def calculate_tentative_page_number_associated_with_end_date(self):
"""Determine the page number that the end_date gazette might be at.
Facts for the design of this method:
- The first page of the pagination contains the most recent gazette.
- We consider most Saturday and Sunday days have no gazette.
Exception example:
Diário Oficial Eletrônico de 14 de Agosto de 2021 - Edição Extra
- Even if the number of rows for the other days may vary from zero to
more than one, we consider that non-Saturday and non-Sunday days
will have one gazette
- Considering the potential variation of established conditions,
such as not having a gazette or having multiple rows for the same day,
we tentatively set that the target end_date gazette might be available on
the calculated page number or one before that.
This method adopts the following heuristic: we calculate the number of
non-Saturday and non-Sunday from the day this method runs until
the target end_date and perform an integer division of the estimated number of
days by the maximum number of rows on a page, and the result is
the chosen page number.
We only replace the calculated number when it is less than one:
for that case, we replace it with 1, as the page numbering begins at 1.
It returns a non-zero positive int.
"""
today = date.today()
if today <= self.end_date:
return self.MINIMUM_ALLOWED_PAGE_NUMBER
non_saturday_nor_sunday_day_count = 0
current_day = self.end_date
one_day_timedelta = timedelta(days=1)
saturday_and_sunday_set = {
6, # Saturday
7, # Sunday
}
while current_day <= today:
if current_day.isoweekday() not in saturday_and_sunday_set:
non_saturday_nor_sunday_day_count += 1
current_day = current_day + one_day_timedelta
self.logger.info(
f"Number of non-Saturday and non-Sunday days from {self.end_date} to"
f" {today}, inclusive: {non_saturday_nor_sunday_day_count}"
)
tentative_page_number = (
non_saturday_nor_sunday_day_count // self.MAX_ROWS_PER_PAGE
)
if tentative_page_number < self.MINIMUM_ALLOWED_PAGE_NUMBER:
tentative_page_number = self.MINIMUM_ALLOWED_PAGE_NUMBER
self.logger.info(
f"Tentative page number for {self.end_date} has defaulted to"
f" {self.MINIMUM_ALLOWED_PAGE_NUMBER}"
)
else:
self.logger.info(
f"Tentative page number for {self.end_date} calculated as"
f" {tentative_page_number}"
)
return tentative_page_number
def extract_date_from_regular_full_nominal_date(self, match_):
textual_date = f"{match_.group(1)} de {match_.group(2)} de {match_.group(3)}"
gazette_date = dateparser.parse(textual_date, languages=["pt"]).date()
return gazette_date
def extract_date_from_month_year_nominal_date(self, match_):
# To avoid any issues for the date conversion, we do a safe replacement to
# initially consider as the first day of the month
textual_date = f"01 de {match_.group(1)} de {match_.group(2)}"
gazette_date = dateparser.parse(textual_date, languages=["pt"]).date()
# As this case is a collection of gazettes for the full month,
# we consider the gazette date as the last day of that month
last_day_of_the_month = calendar.monthrange(
year=gazette_date.year, month=gazette_date.month
)[1]
gazette_date = gazette_date.replace(day=last_day_of_the_month)
return gazette_date
def extract_date_from_gazette_text(self, gazette_text):
if not gazette_text:
return None
text = (
gazette_text
# The extra edition for August 28th, 2018 has a typo in the month name.
.replace("Agosoto", "Agosto")
# The edition for December 17th, 2012 has a typo in the month name.
.replace("Dezembrbo", "Dezembro")
)
if match_ := REGULAR_FULL_NOMINAL_DATE_RE.search(text):
return self.extract_date_from_regular_full_nominal_date(match_)
# From October 2012 to October 2013, it has a single row per month
# The provided data is a rar extension file and some of them are missing
if match_ := MONTH_YEAR_NOMINAL_DATE_RE.search(text):
return self.extract_date_from_month_year_nominal_date(match_)
self.logger.warning(f"No date could be extracted from '{text}'")
return None
def validate_date_condition_over_rows(
self,
remaining_rows_for_current_page,
condition,
):
"""Returns the found gazette date once the provided condition is valid.
In case it is not found across the iteration, returns None.
"""
gazette_date = None
for row_element in remaining_rows_for_current_page:
gazette_text = row_element.css("h4::text").get("").strip()
if not gazette_text:
continue
gazette_date = self.extract_date_from_gazette_text(gazette_text)
if not gazette_date:
continue
if condition(gazette_date):
return gazette_date
return gazette_date
def find_first_valid_date_in_rows(self, remaining_rows_for_current_page):
return self.validate_date_condition_over_rows(
remaining_rows_for_current_page, condition=lambda date_: date_ is not None
)
def find_end_date_or_earlier_in_rows(self, remaining_rows_for_current_page):
return self.validate_date_condition_over_rows(
remaining_rows_for_current_page,
condition=lambda date_: date_ <= self.end_date,
)
def find_end_date_page(
self,
response,
current_page_number,
search_towards_the_past,
):
"""Determine the page for end_date.
In most cases, the tentative_page_number_associated_with_end_date strategy is
a conservative approach, as at 2022-10-05 there are consistently one or more gazettes
per non-Saturday and non-Sunday weekdays. It means that to determine the page for
end_date we will have to browse greater page numbers to find the actual page associated with end_date.
However, we might face a period when, for any reason, the gazettes were not published as expected.
A fallback strategy consists in browsing pages with lower page number than
the tentative_page_number_associated_with_end_date value.
We control the direction of the search by using the `search_towards_the_past` parameter:
- None: we still need to determine which way we need to find the end_date page
- False: tentative_page_number_associated_with_end_date strategy failed due to having more
days without any published gazettes.
- True: tentative_page_number_associated_with_end_date strategy worked, and we need to find
end_date in pages associated with earlier gazettes.
"""
was_end_date_page_found = False
remaining_rows_for_current_page = iter(response.css("ul.ul-licitacoes li"))
gazette_date = self.find_first_valid_date_in_rows(
remaining_rows_for_current_page
)
if not gazette_date:
raise ValueError(f"No valid dates were found for this page: {response.url}")
if not search_towards_the_past:
# This case is valid for both when search_towards_the_past is None and when it is False
if gazette_date < self.end_date:
# First valid date is earlier than end_date
if current_page_number <= self.MINIMUM_ALLOWED_PAGE_NUMBER:
# We already reached the most recent page, so we will start triaging the data
was_end_date_page_found = True
self.logger.info("Page with the most recent gazettes reached")
else:
# We need to retrieve pages associated with newer gazettes
search_towards_the_past = False
else:
search_towards_the_past = True
if (
search_towards_the_past
): # This condition should not be joined with the one above
was_end_date_page_found = gazette_date <= self.end_date
if not was_end_date_page_found:
gazette_date = self.find_end_date_or_earlier_in_rows(
remaining_rows_for_current_page
)
was_end_date_page_found = gazette_date <= self.end_date
if was_end_date_page_found:
# As the page was found, we finally start triaging the data
self.logger.info(
f"{gazette_date.isoformat()} was the closest date to end_date"
f" {self.end_date.isoformat()}, and it was found on page"
f" {current_page_number}. Starting gazette collection."
)
for from_end_date_backward in self.triage_data_per_page(response):
yield from_end_date_backward
else:
assert search_towards_the_past is not None
if search_towards_the_past:
next_call_page_number = current_page_number + 1
self.logger.info(
f"{gazette_date.isoformat()} was the earliest date found on page"
f" {current_page_number}. Searching the closest date to end_date"
f" {self.end_date} on page {next_call_page_number}."
)
else:
next_call_page_number = current_page_number - 1
self.logger.info(
f"{gazette_date.isoformat()} was the most recent date found on page"
f" {current_page_number}. Searching the closest date to end_date"
f" {self.end_date} on page {next_call_page_number}."
)
yield Request(
url=self.BASE_URL.format(PAGE_NUMBER=next_call_page_number),
callback=self.find_end_date_page,
cb_kwargs={
"current_page_number": next_call_page_number,
"search_towards_the_past": search_towards_the_past,
},
)
class RjCamposDosGoytacazesSpider(DetermineEndDatePageMixin, BaseGazetteSpider):
TERRITORY_ID = "3301009"
allowed_domains = ["www.campos.rj.gov.br"]
name = "rj_campos_dos_goytacazes"
start_date = date(2010, 6, 10)
# November 17th, 2017 was the date of the last Diário Oficial gazette and
# also the date of the first Diário Oficial Eletrônico gazette
def __init__(self, *args, **kwargs):
super(RjCamposDosGoytacazesSpider, self).__init__(*args, **kwargs)
self.current_date = None
self.current_edition_number = ""
self.row_texts = set()
# Given that there are some gazettes with multiple rows and they can be
# spread across subsequent pages, we temporarily store data, which is consumed
# once we confirm a differente date found for the next row in the page.
#
# That storage is a dict in which the key a bool value for is_extra_edition
# and each dict entries' value is a list of URLs associated to that gazette date
# and its is_extra_edition value.
self.collected_data_for_current_date: dict[bool, list[str]] = {}
def start_requests(self):
tentative_page_number_associated_with_end_date = (
self.calculate_tentative_page_number_associated_with_end_date()
)
yield Request(
url=self.BASE_URL.format(
PAGE_NUMBER=tentative_page_number_associated_with_end_date
),
callback=self.find_end_date_page,
cb_kwargs={
"current_page_number": tentative_page_number_associated_with_end_date,
"search_towards_the_past": None,
},
dont_filter=True, # the page may have already been requested when determining the end_date page
)
def triage_data_per_row(self, gazette_text):
"""Triage gazette data for a gazette that is from November 17th 2017 or earlier.
It returns a 3-tuple:
the extracted gazette date,
whether it is an extra edition, and
the edition number when applicable.
"""
gazette_date = None
is_extra_edition = (
gazette_text.startswith("Suplemento") or "Extra" in gazette_text
)
edition_number = ""
if not gazette_text:
return gazette_date, is_extra_edition, edition_number
gazette_date = self.extract_date_from_gazette_text(gazette_text)
if (
not gazette_date
or gazette_date < self.start_date
or self.end_date < gazette_date
):
return gazette_date, is_extra_edition, edition_number
edition_number_match = EDITION_NUMBER_RE.search(gazette_text)
if edition_number_match:
edition_number = edition_number_match.group(1).strip()
return gazette_date, is_extra_edition, edition_number
def instantiate_gazettes_and_reset_stored_data(self):
"""Instantiates Gazette from the most recent data and resets the state."""
if not self.current_date:
return []
gazettes = [
Gazette(
date=self.current_date,
edition_number=str(self.current_edition_number),
file_urls=file_urls,
is_extra_edition=is_extra_edition,
power="executive",
)
for (
is_extra_edition,
file_urls,
) in self.collected_data_for_current_date.items()
]
self.current_date = None
self.current_edition_number = ""
self.collected_data_for_current_date = {}
return gazettes
def triage_data_per_page(self, response):
"""Triage gazette data from a page row.
Once we notice that either the edition_number or the gazette_date changed
between rows, we collect the gazettes.
Otherwise, we triage from the next page.
"""
is_gazette_date_before_start_date = False
for row_element in response.css("ul.ul-licitacoes li"):
gazette_text = row_element.css("h4::text").get("").strip()
file_url = row_element.css("a::attr(href)").get().strip()
if not gazette_text or not file_url:
continue
if gazette_text in self.row_texts:
self.logger.info(
f"Textual value '{gazette_text}' was already processed earlier"
)
continue
gazette_date, is_extra_edition, edition_number = self.triage_data_per_row(
gazette_text
)
if not gazette_date:
continue
if gazette_date < self.start_date:
is_gazette_date_before_start_date = True
break
if (
self.current_edition_number != edition_number
or self.current_date != gazette_date
):
yield from self.instantiate_gazettes_and_reset_stored_data()
self.row_texts.add(gazette_text)
self.current_edition_number = edition_number
self.current_date = gazette_date
self.collected_data_for_current_date.setdefault(
is_extra_edition, []
).append(file_url)
next_url = (
response.css(".pagination")
.xpath("//a[contains(text(), 'Proxima')]/@href")
.get()
)
if is_gazette_date_before_start_date or not next_url:
# Collect the gazettes using the triaged data
#
# Regarding the `not next_url` condition, due to the Gazette instantiation
# construct of this spider, this is a corner case when we collect without
# an explicit start_date, meaning that is_gazette_date_before_start_date
# wouldn't be set to True
yield from self.instantiate_gazettes_and_reset_stored_data()
else:
# Keep triaging data
yield Request(
response.urljoin(next_url),
callback=self.triage_data_per_page,
dont_filter=True, # the page may have already been requested when determining the end_date page
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment