Created
March 28, 2025 08:02
-
-
Save triposat/f13d9dd0cb9af104af280eb64209046b to your computer and use it in GitHub Desktop.
Scrape Zillow using Massive Residential Proxies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json, logging, os, asyncio | |
| from dotenv import load_dotenv | |
| from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout | |
| from typing import Dict, List, Optional | |
| # Load environment variables from a .env file | |
| load_dotenv() | |
| # Configure logging for both file and console output | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler( | |
| "zillow_scraper.log"), logging.StreamHandler()], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class Config: | |
| # Proxy and timeout settings | |
| PROXY_SERVER = os.getenv("PROXY_SERVER", "") | |
| PROXY_USERNAME = os.getenv("PROXY_USERNAME", "") | |
| PROXY_PASSWORD = os.getenv("PROXY_PASSWORD", "") | |
| TIMEOUT = int(os.getenv("TIMEOUT", "10000")) | |
| # Resources to block for faster loading | |
| BLOCKED_RESOURCES = ["stylesheet", "image", "media", "font", "imageset"] | |
| # Output file configuration | |
| OUTPUT_DIR = "zillow_results" | |
| OUTPUT_FILE = "all_listings.json" | |
| @classmethod | |
| def get_output_path(cls): | |
| # Return the full path of the output file | |
| return os.path.join(cls.OUTPUT_DIR, cls.OUTPUT_FILE) | |
| class ResultsSaver: | |
| @staticmethod | |
| def load_existing_results() -> List[Dict]: | |
| # Load existing results from the output file if it exists | |
| output_path = Config.get_output_path() | |
| try: | |
| if os.path.exists(output_path): | |
| with open(output_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Error loading existing results: {e}") | |
| return [] | |
| @staticmethod | |
| def save_results(listings: List[Dict]): | |
| # Save the listings to the output file | |
| output_path = Config.get_output_path() | |
| try: | |
| os.makedirs(Config.OUTPUT_DIR, exist_ok=True) | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(listings, f, indent=2, ensure_ascii=False) | |
| logger.info( | |
| f"Saved {len(listings)} listings to {output_path}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error saving results: {e}") | |
| class ListingParser: | |
| @staticmethod | |
| def clean_text(text: Optional[str]) -> str: | |
| # Clean and normalize text by removing extra whitespace | |
| return " ".join(text.strip().split()) if text else "" | |
| @staticmethod | |
| async def extract_listing_details(listing) -> Optional[Dict]: | |
| # Extract core listing information from the property card | |
| try: | |
| data_container = await listing.query_selector( | |
| 'div[class*="property-card-data"]' | |
| ) | |
| if not data_container: | |
| return None | |
| # Extract the address of the listing | |
| address_elem = await data_container.query_selector( | |
| 'address[data-test="property-card-addr"]' | |
| ) | |
| if not address_elem or not (address := await address_elem.text_content()): | |
| return None | |
| # Initialize the details dictionary with extracted address | |
| details = { | |
| "address": ListingParser.clean_text(address), | |
| "status": None, | |
| "price": None, | |
| "bedrooms": None, | |
| "bathrooms": None, | |
| "square_feet": None, | |
| "listing_company": None, | |
| "url": None, | |
| } | |
| # Extract price information | |
| if price_elem := await data_container.query_selector( | |
| 'span[data-test="property-card-price"]' | |
| ): | |
| details["price"] = ListingParser.clean_text( | |
| await price_elem.text_content() | |
| ) | |
| # Extract property details like bedrooms, bathrooms, and square footage | |
| if details_area := await data_container.query_selector( | |
| 'div[class*="StyledPropertyCardDataArea"]:has(ul[class*="StyledPropertyCardHomeDetailsList"])' | |
| ): | |
| await ListingParser._parse_property_details(details_area, details) | |
| # Extract listing company information | |
| await ListingParser._parse_listing_company(data_container, details) | |
| # Extract the URL of the listing | |
| if url_elem := await data_container.query_selector( | |
| 'a[data-test="property-card-link"]' | |
| ): | |
| if url := await url_elem.get_attribute("href"): | |
| details["url"] = ( | |
| f"https://www.zillow.com{url}" | |
| if not url.startswith("http") | |
| else url | |
| ) | |
| return details | |
| except Exception as e: | |
| logger.error(f"Error extracting listing details: {e}") | |
| return None | |
| @staticmethod | |
| async def _parse_property_details(details_area, details: Dict): | |
| # Extract status from the full text of the details area | |
| full_text = await details_area.text_content() | |
| if " - " in full_text: | |
| details["status"] = full_text.strip().split(" - ")[-1].strip() | |
| # Extract specific property details from the list items | |
| if details_list := await details_area.query_selector( | |
| 'ul[class*="StyledPropertyCardHomeDetailsList"]' | |
| ): | |
| detail_items = await details_list.query_selector_all("li") | |
| for item in detail_items: | |
| bold_elem = await item.query_selector("b") | |
| abbr_elem = await item.query_selector("abbr") | |
| # Extract and map the value to the correct property detail | |
| if bold_elem and abbr_elem: | |
| value = await bold_elem.text_content() | |
| unit = await abbr_elem.text_content() | |
| if value and unit: | |
| if unit == "bds": | |
| details["bedrooms"] = value | |
| elif unit == "ba": | |
| details["bathrooms"] = value | |
| elif unit == "sqft": | |
| details["square_feet"] = value.replace(",", "") | |
| @staticmethod | |
| async def _parse_listing_company(data_container, details: Dict): | |
| # Extract the listing company from the details container | |
| company_elems = await data_container.query_selector_all( | |
| 'div[class*="StyledPropertyCardDataArea"]' | |
| ) | |
| for elem in company_elems: | |
| company_text = await elem.text_content() | |
| price_check = await elem.query_selector( | |
| 'span[data-test="property-card-price"]' | |
| ) | |
| # Ensure the element does not contain price information before using it as company | |
| if not price_check and company_text and "- " not in company_text: | |
| details["listing_company"] = ListingParser.clean_text( | |
| company_text) | |
| break | |
| class AsyncZillowSearchScraper: | |
| def __init__(self, headless: bool = True): | |
| self.headless = headless | |
| self.playwright = None | |
| self.browser = None | |
| self.context = None | |
| self.all_listings = ResultsSaver.load_existing_results() | |
| async def __aenter__(self): | |
| # Initialize Playwright and configure browser settings | |
| self.playwright = await async_playwright().start() | |
| browser_config = {"headless": self.headless} | |
| # Configure proxy settings if provided | |
| if Config.PROXY_SERVER: | |
| browser_config["proxy"] = { | |
| "server": Config.PROXY_SERVER, | |
| "username": Config.PROXY_USERNAME, | |
| "password": Config.PROXY_PASSWORD, | |
| } | |
| self.browser = await self.playwright.chromium.launch(**browser_config) | |
| self.context = await self.browser.new_context() | |
| await self.context.route("**/*", self._route_handler) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| # Clean up browser and Playwright instances | |
| await self.context.close() | |
| await self.browser.close() | |
| await self.playwright.stop() | |
| async def _route_handler(self, route): | |
| # Block unnecessary resources (e.g., images, fonts) to speed up page loading | |
| if route.request.resource_type in Config.BLOCKED_RESOURCES: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| async def _wait_for_listings(self, page): | |
| # Retry logic for waiting for property listings to load | |
| for attempt in range(3): | |
| try: | |
| await page.wait_for_selector( | |
| 'li[class*="ListItem"]', timeout=Config.TIMEOUT | |
| ) | |
| return True | |
| except PlaywrightTimeout: | |
| if attempt < 2: | |
| logger.warning( | |
| f"Retry {attempt + 1} waiting for listings..." | |
| ) | |
| await asyncio.sleep(2) | |
| return False | |
| async def _scroll_and_get_listings(self, page): | |
| # Implement infinite scrolling to load all listings | |
| last_listing_count = 0 | |
| scroll_attempts = 0 | |
| MAX_SCROLL_ATTEMPTS = 20 | |
| while scroll_attempts < MAX_SCROLL_ATTEMPTS: | |
| # Get the list of property cards on the page | |
| listings = await page.query_selector_all( | |
| 'article[data-test="property-card"]' | |
| ) | |
| current_listing_count = len(listings) | |
| # If the number of listings has not changed, try waiting for new listings to load | |
| if current_listing_count == last_listing_count and current_listing_count > 0: | |
| await asyncio.sleep(2) | |
| listings = await page.query_selector_all( | |
| 'article[data-test="property-card"]' | |
| ) | |
| if len(listings) == current_listing_count: | |
| break | |
| else: | |
| last_listing_count = current_listing_count | |
| await page.evaluate("window.scrollBy(0, 500)") | |
| try: | |
| # Wait for additional listings to load dynamically | |
| await page.wait_for_function( | |
| "document.querySelectorAll('article[data-test=\"property-card\"]').length > " + str( | |
| current_listing_count), | |
| timeout=Config.TIMEOUT | |
| ) | |
| except PlaywrightTimeout: | |
| logger.debug("Timeout waiting for new listings to load.") | |
| # Scroll to the last listing to trigger more listings to load | |
| if listings: | |
| try: | |
| await listings[-1].scroll_into_view_if_needed() | |
| except Exception as e: | |
| logger.debug(f"Scroll into view failed: {e}") | |
| scroll_attempts += 1 | |
| await asyncio.sleep(3) | |
| # Get the final list of listings after scrolling | |
| final_listings = await page.query_selector_all( | |
| 'article[data-test="property-card"]' | |
| ) | |
| logger.info(f"Found {len(final_listings)} listings") | |
| return final_listings | |
| async def scrape_search_results( | |
| self, url: str, max_pages: Optional[int] = None | |
| ) -> List[Dict]: | |
| # Scrape the search results from the specified URL | |
| page = await self.context.new_page() | |
| current_page = 1 | |
| try: | |
| # Load the initial search results page | |
| await page.goto(url, wait_until="domcontentloaded") | |
| await asyncio.sleep(5) | |
| while True: | |
| logger.info( | |
| f"Scraping page {current_page}" | |
| + (f" of {max_pages}" if max_pages else " (scraping all pages)") | |
| ) | |
| # Stop scraping if the specified maximum number of pages is reached | |
| if max_pages is not None and current_page > max_pages: | |
| logger.info( | |
| f"Reached specified limit of {max_pages} pages" | |
| ) | |
| break | |
| # Scroll to the top of the page and wait for the listings container to load | |
| if current_page == 1: | |
| await page.evaluate("window.scrollTo(0, 0)") | |
| await asyncio.sleep(2) | |
| await page.wait_for_selector( | |
| 'div[id="search-page-list-container"]', timeout=Config.TIMEOUT | |
| ) | |
| # Wait for listings to appear on the page | |
| if not await self._wait_for_listings(page): | |
| break | |
| # Extract listings from the current page | |
| listings = await self._scroll_and_get_listings(page) | |
| page_listings = [] | |
| for listing in listings: | |
| if details := await ListingParser.extract_listing_details(listing): | |
| page_listings.append(details) | |
| self.all_listings.extend(page_listings) | |
| ResultsSaver.save_results(self.all_listings) | |
| logger.info(f"Processed {len(page_listings)} listings from page {current_page}") | |
| logger.info(f"Total listings so far: {len(self.all_listings)}") | |
| # Navigate to the next page of listings if available | |
| try: | |
| next_button = await page.query_selector( | |
| 'a[title="Next page"]:not([disabled])' | |
| ) | |
| if next_button: | |
| if max_pages is None or current_page < max_pages: | |
| await next_button.click() | |
| current_page += 1 | |
| await asyncio.sleep(5) | |
| await page.wait_for_selector( | |
| 'article[data-test="property-card"]', | |
| timeout=Config.TIMEOUT, | |
| ) | |
| else: | |
| break | |
| else: | |
| logger.info("No more pages available") | |
| break | |
| except Exception as e: | |
| logger.error(f"Navigation error: {str(e)}") | |
| break | |
| except Exception as e: | |
| logger.error(f"Error during scraping: {e}") | |
| finally: | |
| # Close the page once scraping is complete | |
| await page.close() | |
| return self.all_listings | |
| async def main(): | |
| # Main function to run the scraper | |
| search_url = "https://www.zillow.com/chicago-il/" | |
| max_pages = None | |
| async with AsyncZillowSearchScraper(headless=False) as scraper: | |
| results = await scraper.scrape_search_results(search_url, max_pages=max_pages) | |
| print(f"\nTotal listings scraped: {len(results)}") | |
| if results: | |
| print("\nSample listing:") | |
| print(json.dumps(results[0], indent=2, ensure_ascii=False)) | |
| if __name__ == "__main__": | |
| # Run the main function asynchronously | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment