Skip to content

Instantly share code, notes, and snippets.

@nehiljain
Last active January 28, 2024 21:46
Show Gist options
  • Save nehiljain/3f3542c1c9a81d6a41fdc501ec745367 to your computer and use it in GitHub Desktop.
Save nehiljain/3f3542c1c9a81d6a41fdc501ec745367 to your computer and use it in GitHub Desktop.
Google Hotel Search Scraper V1
import instructor
import base64
import logging
import os
from openai import AsyncOpenAI, OpenAI
import asyncio
from pydantic import Field, BaseModel
from typing import List
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
# Add logger
logging.basicConfig()
logger = logging.getLogger("app")
logger.setLevel("INFO")
class Rate(BaseModel):
# Hotel Rate. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE]
provider: str = Field(description="Provider name", example="Booking.com")
currency: str = Field(description="Currency", example="USD")
rate: float = Field(description="Rate", example=110.0)
class RateList(BaseModel):
# List of Hotel Rates. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE]
rates: List[Rate] = Field(
description="List of Hotel Rates. Each rate is a list of 3 elements: [PROVIDER, CURRENCY, RATE]",
example=[
Rate(provider="Booking.com", currency="USD", rate=110.0),
Rate(provider="Expedia", currency="USD", rate=90.0)
])
error: bool = Field(default=False, description="Error flag. True if there was an error, False otherwise.")
def encode_image(image_path) -> str:
"""Encode image to base64 string."""
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read())
return encoded_string.decode("utf-8")
client_image = instructor.patch(
AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")),
mode=instructor.Mode.MD_JSON
)
async def read_prices(image_file_path: str, hotel_name: str) -> RateList:
"""Read prices from image."""
encoded_string = encode_image(image_file_path)
return await client_image.chat.completions.create(
model='gpt-4-vision-preview',
response_model=RateList,
max_tokens=4000,
temperature=0.0,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Find all the prices from the attached picture of google hotel search results of {hotel_name}. The price should have an associated provider as well."
},
{
"type": "image_url",
"image_url": f"data:image/png;base64,{encoded_string}"
}
]
}
]
)
async def main():
result = await read_prices("full_page_screenshot.png")
logger.info(result)
return result
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from playwright.async_api import async_playwright, Playwright, TimeoutError
from pathlib import Path
import re
import pendulum
from typing import List
from pydantic import BaseModel, Field
from dagster import get_dagster_logger
# Add logger
semaphore = asyncio.Semaphore(3)
logger = get_dagster_logger()
class Price(BaseModel):
# Hotel rate text scrapped from the Meta website
text: str = Field(description="Hotel rate text scrapped from the Meta website", example="Booking.com $110 USD")
hotel_name: str = Field(description="Hotel name", example="Ace Hotel, NYC")
checkin_date: str = Field(description="Checkin date", example="2024-04-12")
length_of_stay: int = Field(description="Length of stay", example=1)
scrapped_url: str = Field(description="URL of the scrapped page", example="https://www.google.com/travel/search")
async def fetch_google_hotel_prices_desktop(hotel_name, checkin_date, screenshot_dpath, length_of_stay=1) -> List[dict]:
logger.debug(f"Starting desktop price fetch for hotel: {hotel_name}, check-in: {checkin_date}, stay length: {length_of_stay} days")
async with semaphore:
async with async_playwright() as playwright:
prices = []
chromium = playwright.chromium
browser = await chromium.launch(
headless=True,
)
page = await browser.new_page()
logger.debug("Navigating to https://www.google.com/travel/search")
try:
await page.goto("https://www.google.com/travel/search")
await page.get_by_role("combobox", name="Search for places, hotels and more").click()
await page.get_by_role("combobox", name="Search for places, hotels and more").fill(hotel_name)
await page.get_by_role("combobox", name="Search for places, hotels and more").press("Enter")
logger.debug("Waiting for page load for hotel name %s checkin date %s and length of stay %s", hotel_name, checkin_date, length_of_stay)
await asyncio.sleep(5)
is_right_page = (await page.get_by_label("Reviews", exact=True).first.is_visible() and \
await page.get_by_label("About", exact=True).first.is_visible() and \
await page.get_by_label("Overview", exact=True).first.is_visible())
is_list_item_shown = await page.get_by_label(hotel_name, exact=True).is_visible()
logger.debug("Right page shown: %s, List item shown: %s", is_right_page, is_list_item_shown)
checkin_picker = page.get_by_role("textbox", name=re.compile("Check-in", re.IGNORECASE))
checkout_picker = page.get_by_role("textbox", name=re.compile("Check-out", re.IGNORECASE))
orig_value = await checkin_picker.input_value()
logger.debug("Original checkin date: %s", orig_value)
dest_checkin_date = pendulum.parse(checkin_date)
await checkin_picker.fill(dest_checkin_date.format('ddd, MMM D'))
await asyncio.sleep(4)
await checkout_picker.fill((dest_checkin_date + pendulum.duration(days=length_of_stay)).format('ddd, MMM D'))
await checkout_picker.press("Enter")
logger.debug("Right page shown: %s, List item shown: %s", is_right_page, is_list_item_shown)
logger.debug("New checkin date: %s and checkout date: %s", await checkin_picker.input_value(), await checkout_picker.input_value())
await asyncio.sleep(2)
logger.debug("New checkin date: %s and checkout date: %s", await checkin_picker.input_value(), await checkout_picker.input_value())
while not await page.get_by_role("button", name=re.compile("Fewer Options", re.IGNORECASE)).is_visible():
await page.get_by_role("button", name=re.compile("View more options from \$\d+", re.IGNORECASE)).click()
await asyncio.sleep(6)
logger.debug("All prices loaded")
all_prices = await page.get_by_role("link", name=re.compile("\w+(\s\w+)* \$\d+ \w+(\s\w+)*")).all()
logger.debug("Found %s prices for hotel %s", len(all_prices), hotel_name)
for price in all_prices:
text = await price.all_inner_texts()
# Rules to clean up the text
text = re.sub(re.escape('View site'), '', text[0], flags=re.IGNORECASE)
text = text.replace('\n',';;')
price_obj = Price(text=text+";;desktop", hotel_name=hotel_name, checkin_date=checkin_date, length_of_stay=length_of_stay, scrapped_url=page.url)
logger.debug("Price object: %s", dict(price_obj))
prices.append(dict(price_obj))
sanitized_hotel_name = re.sub(r'[^A-Za-z0-9]', '-', hotel_name)
screenshot_filepath = Path(screenshot_dpath, f'desktop_full_page_screenshot__{sanitized_hotel_name}__{dest_checkin_date.format("YYYYMMDD")}__{pendulum.now("UTC").format("YYYYMMDDHHmm")}.png')
logger.info(f"Saving screenshot to {screenshot_filepath}")
await page.screenshot(path=screenshot_filepath, full_page=True)
await browser.close()
logger.debug("Closed browser")
return prices
except TimeoutError as e:
logger.error(f"Failed to load page for hotel {hotel_name} with checkin date {checkin_date} and length of stay {length_of_stay}: {e}")
return [{
'hotel_name': hotel_name,
'checkin_date': checkin_date,
'length_of_stay': length_of_stay,
'scrapped_url': page.url,
'text': 'Failed to load page'
}]
async def execute_price_scraping_workflow(inputs):
results = await asyncio.gather(*(fetch_google_hotel_prices_desktop(hotel_name=inp['hotel_name'], checkin_date=inp['checkin_date'], length_of_stay=1) for inp in inp[0]))
return results
if __name__ == "__main__":
inputs = [
{
"hotel_name": "hilton london kensington",
"checkin_date": "2024-04-15",
"length_of_stay": 1
},
{
"hotel_name": "chelsea hotel, toronto",
"checkin_date": "2024-04-20",
"length_of_stay": 1
}
]
asyncio.run(execute_price_scraping_workflow(inputs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment