Skip to content

Instantly share code, notes, and snippets.

@triposat
Created October 13, 2024 17:57
Show Gist options
  • Select an option

  • Save triposat/d4bc5e4d71f911b144f312db331047ab to your computer and use it in GitHub Desktop.

Select an option

Save triposat/d4bc5e4d71f911b144f312db331047ab to your computer and use it in GitHub Desktop.
Google Images Scraper - ScrapingAnt
import asyncio
import json
import os
import shutil
from aiohttp import ClientSession, ClientTimeout
from urllib.parse import urlparse, urlencode
from playwright.async_api import async_playwright
# Function to extract the domain from a URL
def extract_domain(url):
"""
Extract the domain from the given URL.
If the domain starts with 'www.', it removes it.
Args:
url (str): The URL to extract the domain from.
Returns:
str: The extracted domain.
"""
domain = urlparse(url).netloc
if domain.startswith("www."):
domain = domain[4:]
return domain
# Function to download an image with retry logic
async def download_image(session, img_url, file_path, retries=3):
"""
Download an image from the given URL and save it to the specified file path.
If the download fails, it retries the specified number of times.
Args:
session (ClientSession): The aiohttp session to use for downloading.
img_url (str): The URL of the image to download.
file_path (str): The path to save the downloaded image.
retries (int, optional): The number of retries for downloading. Defaults to 3.
Returns:
None
"""
attempt = 0
while attempt < retries:
try:
# Attempt to download the image
async with session.get(img_url) as response:
if response.status == 200:
# Write the image content to the file
with open(file_path, "wb") as f:
f.write(await response.read())
print(f"Downloaded image to: {file_path}")
return
else:
print(f"Failed to download image from {img_url}. Status: {response.status}")
except Exception as e:
print(f"Error downloading image from {img_url}: {e}")
attempt += 1
# Retry if the maximum number of attempts has not been reached
if attempt < retries:
print(f"Retrying download for {img_url} (attempt {attempt + 1}/{retries})")
await asyncio.sleep(2**attempt) # Exponential backoff for retries
print(f"Failed to download image from {img_url} after {retries} attempts.")
# Function to scroll to the bottom of the page
async def scroll_to_bottom(page):
"""
Scroll to the bottom of the web page using Playwright.
Args:
page (Page): The Playwright page object to scroll.
Returns:
None
"""
print("Scrolling...")
previous_height = await page.evaluate("document.body.scrollHeight")
while True:
# Scroll to the bottom of the page
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(1)
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == previous_height:
break
previous_height = new_height
print("Reached the bottom of the page.")
# Main function to scrape Google Images
async def scrape_google_images(search_query="macbook m3", max_images=None, timeout_duration=10):
"""
Scrape images from Google Images for a given search query.
Args:
search_query (str, optional): The search term to use for Google Images. Defaults to "macbook m3".
max_images (int, optional): The maximum number of images to download. If None, downloads all available. Defaults to None.
timeout_duration (int, optional): The timeout duration for the image download session. Defaults to 10 seconds.
Returns:
None
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False) # Launch a Chromium browser
page = await browser.new_page() # Open a new browser page
# Build the Google Images search URL with the query
query_params = urlencode({"q": search_query, "tbm": "isch"})
search_url = f"https://www.google.com/search?{query_params}"
print(f"Navigating to search URL: {search_url}")
await page.goto(search_url) # Navigate to the search results page
# Scroll to the bottom of the page to load more images
await scroll_to_bottom(page)
await page.wait_for_selector('div[data-id="mosaic"]') # Wait for the image section to appear
# Set up directories for image storage
download_folder = "downloaded_images"
json_file_path = "google_images_data.json"
if os.path.exists(download_folder):
# Prompt the user whether to delete or archive the existing folder
user_input = input(f"The folder '{download_folder}' already exists. Do you want to delete it? (yes/no): ")
if user_input.lower() == "yes":
print(f"Removing existing folder: {download_folder}")
shutil.rmtree(download_folder)
else:
archive_folder = f"{download_folder}_archive"
print(f"Archiving existing folder to: {archive_folder}")
shutil.move(download_folder, archive_folder)
os.makedirs(download_folder) # Create a new folder to store the images
# Initialize the JSON file to store image metadata
with open(json_file_path, "w") as json_file:
json.dump([], json_file)
# Find all image elements on the page
image_elements = await page.query_selector_all('div[data-attrid="images universal"]')
print(f"Found {len(image_elements)} image elements on the page.")
async with ClientSession(timeout=ClientTimeout(total=timeout_duration)) as session:
images_downloaded = 0
image_data_list = []
# Iterate through the image elements
for idx, image_element in enumerate(image_elements):
if max_images is not None and images_downloaded >= max_images:
print(f"Reached max image limit of {max_images}. Stopping download.")
break
try:
print(f"Processing image {idx + 1}...")
# Click on the image to get a full view
await image_element.click()
await page.wait_for_selector("img.sFlh5c.FyHeAf.iPVvYb[jsaction]")
img_tag = await page.query_selector("img.sFlh5c.FyHeAf.iPVvYb[jsaction]")
if not img_tag:
print(f"Failed to find image tag for element {idx + 1}")
continue
# Get the image URL
img_url = await img_tag.get_attribute("src")
file_extension = os.path.splitext(urlparse(img_url).path)[1] or ".png"
file_path = os.path.join(download_folder, f"image_{idx + 1}{file_extension}")
# Download the image
await download_image(session, img_url, file_path)
# Extract source URL and image description
source_url = await page.query_selector('(//div[@jsname="figiqf"]/a[@class="YsLeY"])[2]')
source_url = await source_url.get_attribute("href") if source_url else "N/A"
image_description = await img_tag.get_attribute("alt")
source_name = extract_domain(source_url)
# Store image metadata
image_data = {
"image_description": image_description,
"source_url": source_url,
"source_name": source_name,
"image_file": file_path,
}
image_data_list.append(image_data)
print(f"Image {idx + 1} metadata prepared.")
images_downloaded += 1
except Exception as e:
print(f"Error processing image {idx + 1}: {e}")
continue
# Save image metadata to a JSON file
with open(json_file_path, "w") as json_file:
json.dump(image_data_list, json_file, indent=4)
print(f"Finished downloading {images_downloaded} images.")
await browser.close() # Close the browser when done
# Run the main function with specified query and limits
asyncio.run(scrape_google_images(search_query="iphone 16 pro", max_images=10, timeout_duration=10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment