Last active
August 4, 2021 11:16
-
-
Save thehapyone/8a4bd80c6179c6dfad34d939281d8314 to your computer and use it in GitHub Desktop.
Get new first hand apartments notification from AWS SNS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from typing import Optional, Union, Tuple, NamedTuple | |
import hashlib | |
import logging | |
import os | |
import boto3 | |
import requests | |
from bs4 import BeautifulSoup, NavigableString, Tag | |
logger = logging.getLogger("scrapper_logger") | |
logger.setLevel(logging.DEBUG) | |
# get the google api key | |
api_key = os.environ.get('GOOGLE_API_KEY') | |
# create the AWS db resource | |
db = boto3.resource('dynamodb') | |
table = db.Table('ayo_housing_table') | |
# create the SNS resource | |
sns = boto3.resource('sns') | |
topic = sns.Topic('AWS TOPIC ARN') | |
class Apartment(NamedTuple): | |
""" | |
A named tuple class for an apartment entity | |
""" | |
title: str | |
location: str | |
link: str | |
area: str | |
rent: str | |
rooms: str | |
access: str | |
number: str | |
others: str | |
distance: Optional[str] | |
time: Optional[str] | |
unique_id: str | |
class Scrapper: | |
""" | |
The scrapper entry class | |
:param str url: The housing url | |
:param str google_api_key: The Google API key for accessing the map features | |
""" | |
def __init__(self, url, google_api_key): | |
self.url = url | |
self._google_api_key = google_api_key | |
self.house = self.initialize_soup() | |
self.reference_location = "vasastan" | |
def initialize_soup(self) -> Optional[BeautifulSoup]: | |
"""Here we will initialize the apartment site details""" | |
try: | |
site = requests.get(url=self.url, verify=True) | |
soup = BeautifulSoup(site.content, 'html.parser') | |
except Exception: | |
logger.error("Could not initialize Scrapper site object", exc_info=True) | |
return None | |
return soup | |
def get_apartments_section(self) -> Union[Tag, NavigableString]: | |
"""Returns the BeautifulSoup section of the area of the apartment""" | |
return self.house.find('section', class_='local-objects-info') | |
def get_apartments(self) -> tuple: | |
"""Gets the current active apartments from the housing site""" | |
# Gets the number of apartments | |
no_of_apartment = self.no_of_apartments() | |
if no_of_apartment == 0: | |
return tuple() | |
# Get the apartments | |
apartments = self.get_apartments_section().find('div', class_='blocks-wrapper carousel'). \ | |
find_all('div', class_='block-item semi carousel-cell') | |
apartment_lists = [self.get_apartment_details(apartment) for apartment in apartments] | |
return tuple(apartment_lists) | |
def no_of_apartments(self) -> int: | |
"""Returns the no of current active apartments""" | |
# Gets the section of the apartments counts are | |
apartment_counts = self.get_apartments_section().find('ul', class_='ojects-term-list'). \ | |
find_all('span', class_='total-article') | |
counts = tuple([int(item.text) for item in apartment_counts]) | |
# the apartments count is located in the 2nd index | |
return counts[1] | |
def get_apartment_details(self, apartment_item: BeautifulSoup) -> Optional[Apartment]: | |
"""Gets all the scraped details for each apartment""" | |
link = apartment_item.find('a')['href'] | |
# Extract details | |
try: | |
apartment_soup = BeautifulSoup(requests.get(url=link).content, 'html.parser') | |
except Exception: | |
logger.error("Could not retrieve the apartment details", exc_info=True) | |
return None | |
# creates the apartment object | |
information = apartment_soup.find('section', class_='content-block').find('div', class_='box-container') | |
title = apartment_soup.find('h1').text.strip() | |
apartment_infos = information.find_all('strong', class_='info-item') | |
results = {'title': title, 'link': link} | |
targets = ['Tillträde', 'Antal', 'Area', 'Hyra', 'Objektsnummer', 'Övrigt'] | |
for info in apartment_infos: | |
for target in targets: | |
if target.lower() in info.text.strip().lower().split(" ")[0]: | |
value = info.find('span', class_='data').text.strip() | |
results[target] = value | |
break | |
# Get the location estimate | |
location, distance, travel_time = self.location_estimate(destination=title) | |
# Build the apartment object | |
apartment = Apartment(title=title.strip(), location=location.strip(), | |
link=link.strip(), area=results.get("Area", "").strip(), | |
access=results.get("Tillträde", "").strip(), | |
number=results.get("Objektsnummer", "").strip(), | |
rent=results.get("Hyra", "").strip(), | |
rooms=results.get("Antal", "").strip(), | |
others=results.get("Övrigt", "").strip(), | |
distance=distance.strip(), time=travel_time.strip(), | |
unique_id=hashlib.md5(f'{link.strip()}{title.strip()}'.encode()).hexdigest() | |
) | |
return apartment | |
def location_estimate(self, destination: str) -> Optional[Tuple[Optional[str], Optional[str], Optional[str]]]: | |
""" | |
Using the Google API to estimate the location distance and time from some reference point | |
""" | |
# url variable store url | |
url = 'https://maps.googleapis.com/maps/api/distancematrix/json?' | |
# Get method of requests module | |
params = dict( | |
origins=self.reference_location, | |
destinations=destination, | |
key=self._google_api_key | |
) | |
# return response object | |
try: | |
response = requests.get(url, params=params, verify=True).json() # type: dict | |
except Exception: | |
logger.warning(f"Error occurred while communicating with Google distance matrix service.", exc_info=True) | |
return None | |
data = response.get('rows')[0].get('elements')[0] # type: dict | |
# Gets the correct location name according to the Google API | |
try: | |
location = response.get('destination_addresses')[0] | |
except Exception: | |
location = destination | |
# status = data.get('status') | |
try: | |
distance = data.get('distance').get('text') | |
travel_time = data.get('duration').get('text') | |
except (AttributeError, KeyError): | |
logger.warning(f"Distance and Travel time couldn't not be fetched. Data packet is " | |
f"{json.dumps(data)}", exc_info=True) | |
distance, travel_time = "", "" | |
return location, distance, travel_time | |
def find_unique_apartments(apartments: Tuple[Apartment]) -> Tuple[Apartment]: | |
"""Returns a set of unique apartments that are not currently saved in the db""" | |
unique_apartments = [] | |
for apartment in apartments: | |
unique_id = apartment.unique_id | |
# dynamodb fetch for apartment | |
result = table.get_item( | |
Key={ | |
'unique_id': unique_id | |
}).get("Item", None) | |
if result is None: | |
unique_apartments.append(apartment) | |
else: | |
logger.debug(f"Apartment with unique id {unique_id} is already present in db") | |
return tuple(unique_apartments) | |
def add_apartments_to_db(apartments: Tuple[Apartment]): | |
"""Adds the processed apartments to db to avoid processing again""" | |
for apartment in apartments: | |
try: | |
table.put_item( | |
Item={**apartment._asdict()}) | |
except Exception: | |
logger.warning(f"An error has occurred in saving apartment " | |
f"with unique id {apartment.unique_id} to db.", exc_info=True) | |
def publish_apartments(apartments: Tuple[Apartment]): | |
"""Publish the Apartments to SNS topic for subscribers to react""" | |
apartment_message_list = [] | |
for apartment in apartments: | |
message_list = [f'{key.capitalize()}: {value}' for key, value in apartment._asdict().items()] | |
message = "\n".join(message_list) | |
apartment_message = f"{message}\n\n" | |
apartment_message_list.append(apartment_message) | |
header = f"{len(apartments)} New apartments has been found. See below and apply to each one. \n" | |
deliver_message = header + "\n".join(apartment_message_list) | |
sms_message = f"{len(apartments)} New apartments has been found. See email and apply." | |
message_body = dict( | |
default=sms_message, | |
email=deliver_message | |
) | |
try: | |
publish_response = topic.publish( | |
Message=json.dumps(message_body), | |
Subject="URGENT - New Apartment(s) found", | |
MessageStructure='json' | |
) | |
logger.debug(f'Message has been published. MessageId - {publish_response.get("MessageId")}') | |
except Exception: | |
logger.error("Could not publish message to SNS topic", exc_info=True) | |
else: | |
# Save the apartments to the db if no error. | |
add_apartments_to_db(apartments) | |
def lambda_handler(event, context): | |
"""The handler for lambda invocation""" | |
website = "https://wahlinfastigheter.se/lediga-objekt/lagenheter/" | |
scrapper = Scrapper(website, api_key) | |
try: | |
apartments = scrapper.get_apartments() | |
except Exception: | |
logger.error("An error has occurred", exc_info=True) | |
return { | |
'status_code': 401, | |
'status': 'failed' | |
} | |
logger.info(f"Apartments: {apartments}") | |
if apartments: | |
unique_apartments = find_unique_apartments(apartments) | |
logger.info(f"Unique apartments: {unique_apartments}") | |
if unique_apartments: | |
publish_apartments(unique_apartments) | |
return { | |
'status_code': 201, | |
'status': 'success' | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment