Last active
January 28, 2024 21:46
-
-
Save nehiljain/3f3542c1c9a81d6a41fdc501ec745367 to your computer and use it in GitHub Desktop.
Google Hotel Search Scraper V1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import instructor | |
import base64 | |
import logging | |
import os | |
from openai import AsyncOpenAI, OpenAI | |
import asyncio | |
from pydantic import Field, BaseModel | |
from typing import List | |
from dotenv import find_dotenv, load_dotenv | |
load_dotenv(find_dotenv()) | |
# Add logger | |
logging.basicConfig() | |
logger = logging.getLogger("app") | |
logger.setLevel("INFO") | |
class Rate(BaseModel): | |
# Hotel Rate. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE] | |
provider: str = Field(description="Provider name", example="Booking.com") | |
currency: str = Field(description="Currency", example="USD") | |
rate: float = Field(description="Rate", example=110.0) | |
class RateList(BaseModel): | |
# List of Hotel Rates. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE] | |
rates: List[Rate] = Field( | |
description="List of Hotel Rates. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE]", | |
example=[ | |
Rate(provider="Booking.com", currency="USD", rate=110.0), | |
Rate(provider="Expedia", currency="USD", rate=90.0) | |
]) | |
error: bool = Field(default=False, description="Error flag. True if there was an error, False otherwise.") | |
def encode_image(image_path) -> str: | |
"""Encode image to base64 string.""" | |
with open(image_path, "rb") as image_file: | |
encoded_string = base64.b64encode(image_file.read()) | |
return encoded_string.decode("utf-8") | |
client_image = instructor.patch( | |
AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")), | |
mode=instructor.Mode.MD_JSON | |
) | |
async def read_prices(image_file_path: str, hotel_name: str) -> RateList: | |
"""Read prices from image.""" | |
encoded_string = encode_image(image_file_path) | |
return await client_image.chat.completions.create( | |
model='gpt-4-vision-preview', | |
response_model=RateList, | |
max_tokens=4000, | |
temperature=0.0, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"Find all the prices from the attached picture of google hotel search results of {hotel_name}. The price should have an associated provider as well." | |
}, | |
{ | |
"type": "image_url", | |
"image_url": f"data:image/png;base64,{encoded_string}" | |
} | |
] | |
} | |
] | |
) | |
async def main(): | |
result = await read_prices("full_page_screenshot.png") | |
logger.info(result) | |
return result | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from playwright.async_api import async_playwright, Playwright, TimeoutError | |
from pathlib import Path | |
import re | |
import pendulum | |
from typing import List | |
from pydantic import BaseModel, Field | |
from dagster import get_dagster_logger | |
# Add logger | |
semaphore = asyncio.Semaphore(3) | |
logger = get_dagster_logger() | |
class Price(BaseModel): | |
# Hotel rate text scrapped from the Meta website | |
text: str = Field(description="Hotel rate text scrapped from the Meta website", example="Booking.com $110 USD") | |
hotel_name: str = Field(description="Hotel name", example="Ace Hotel, NYC") | |
checkin_date: str = Field(description="Checkin date", example="2024-04-12") | |
length_of_stay: int = Field(description="Length of stay", example=1) | |
scrapped_url: str = Field(description="URL of the scrapped page", example="https://www.google.com/travel/search") | |
async def fetch_google_hotel_prices_desktop(hotel_name, checkin_date, screenshot_dpath, length_of_stay=1) -> List[dict]: | |
logger.debug(f"Starting desktop price fetch for hotel: {hotel_name}, check-in: {checkin_date}, stay length: {length_of_stay} days") | |
async with semaphore: | |
async with async_playwright() as playwright: | |
prices = [] | |
chromium = playwright.chromium | |
browser = await chromium.launch( | |
headless=True, | |
) | |
page = await browser.new_page() | |
logger.debug("Navigating to https://www.google.com/travel/search") | |
try: | |
await page.goto("https://www.google.com/travel/search") | |
await page.get_by_role("combobox", name="Search for places, hotels and more").click() | |
await page.get_by_role("combobox", name="Search for places, hotels and more").fill(hotel_name) | |
await page.get_by_role("combobox", name="Search for places, hotels and more").press("Enter") | |
logger.debug("Waiting for page load for hotel name %s checkin date %s and length of stay %s", hotel_name, checkin_date, length_of_stay) | |
await asyncio.sleep(5) | |
is_right_page = (await page.get_by_label("Reviews", exact=True).first.is_visible() and \ | |
await page.get_by_label("About", exact=True).first.is_visible() and \ | |
await page.get_by_label("Overview", exact=True).first.is_visible()) | |
is_list_item_shown = await page.get_by_label(hotel_name, exact=True).is_visible() | |
logger.debug("Right page shown: %s, List item shown: %s", is_right_page, is_list_item_shown) | |
checkin_picker = page.get_by_role("textbox", name=re.compile("Check-in", re.IGNORECASE)) | |
checkout_picker = page.get_by_role("textbox", name=re.compile("Check-out", re.IGNORECASE)) | |
orig_value = await checkin_picker.input_value() | |
logger.debug("Original checkin date: %s", orig_value) | |
dest_checkin_date = pendulum.parse(checkin_date) | |
await checkin_picker.fill(dest_checkin_date.format('ddd, MMM D')) | |
await asyncio.sleep(4) | |
await checkout_picker.fill((dest_checkin_date + pendulum.duration(days=length_of_stay)).format('ddd, MMM D')) | |
await checkout_picker.press("Enter") | |
logger.debug("Right page shown: %s, List item shown: %s", is_right_page, is_list_item_shown) | |
logger.debug("New checkin date: %s and checkout date: %s", await checkin_picker.input_value(), await checkout_picker.input_value()) | |
await asyncio.sleep(2) | |
logger.debug("New checkin date: %s and checkout date: %s", await checkin_picker.input_value(), await checkout_picker.input_value()) | |
while not await page.get_by_role("button", name=re.compile("Fewer Options", re.IGNORECASE)).is_visible(): | |
await page.get_by_role("button", name=re.compile("View more options from \$\d+", re.IGNORECASE)).click() | |
await asyncio.sleep(6) | |
logger.debug("All prices loaded") | |
all_prices = await page.get_by_role("link", name=re.compile("\w+(\s\w+)* \$\d+ \w+(\s\w+)*")).all() | |
logger.debug("Found %s prices for hotel %s", len(all_prices), hotel_name) | |
for price in all_prices: | |
text = await price.all_inner_texts() | |
# Rules to clean up the text | |
text = re.sub(re.escape('View site'), '', text[0], flags=re.IGNORECASE) | |
text = text.replace('\n',';;') | |
price_obj = Price(text=text+";;desktop", hotel_name=hotel_name, checkin_date=checkin_date, length_of_stay=length_of_stay, scrapped_url=page.url) | |
logger.debug("Price object: %s", dict(price_obj)) | |
prices.append(dict(price_obj)) | |
sanitized_hotel_name = re.sub(r'[^A-Za-z0-9]', '-', hotel_name) | |
screenshot_filepath = Path(screenshot_dpath, f'desktop_full_page_screenshot__{sanitized_hotel_name}__{dest_checkin_date.format("YYYYMMDD")}__{pendulum.now("UTC").format("YYYYMMDDHHmm")}.png') | |
logger.info(f"Saving screenshot to {screenshot_filepath}") | |
await page.screenshot(path=screenshot_filepath, full_page=True) | |
await browser.close() | |
logger.debug("Closed browser") | |
return prices | |
except TimeoutError as e: | |
logger.error(f"Failed to load page for hotel {hotel_name} with checkin date {checkin_date} and length of stay {length_of_stay}: {e}") | |
return [{ | |
'hotel_name': hotel_name, | |
'checkin_date': checkin_date, | |
'length_of_stay': length_of_stay, | |
'scrapped_url': page.url, | |
'text': 'Failed to load page' | |
}] | |
async def execute_price_scraping_workflow(inputs): | |
results = await asyncio.gather(*(fetch_google_hotel_prices_desktop(hotel_name=inp['hotel_name'], checkin_date=inp['checkin_date'], length_of_stay=1) for inp in inp[0])) | |
return results | |
if __name__ == "__main__": | |
inputs = [ | |
{ | |
"hotel_name": "hilton london kensington", | |
"checkin_date": "2024-04-15", | |
"length_of_stay": 1 | |
}, | |
{ | |
"hotel_name": "chelsea hotel, toronto", | |
"checkin_date": "2024-04-20", | |
"length_of_stay": 1 | |
} | |
] | |
asyncio.run(execute_price_scraping_workflow(inputs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment