Skip to content

Instantly share code, notes, and snippets.

@ripiuk
Last active June 8, 2019 17:47
Show Gist options
  • Save ripiuk/326b7dcf25b162f2004c293842c92600 to your computer and use it in GitHub Desktop.
Save ripiuk/326b7dcf25b162f2004c293842c92600 to your computer and use it in GitHub Desktop.
search images in google and parse the html page
import os
import time
import uuid
import asyncio
import typing as typ
import urllib.parse as urlparse
from lxml import html
from aiohttp import ClientSession
SEARCH = "Dogs"
DOWNLOAD_DIR = "imgs/{}".format(SEARCH)
PAGES = 5
def custom_search() -> typ.List[str]:
# https://developers.google.com/custom-search/
pass
async def download_imgs(imgs: typ.List[str], session) -> None:
tasks = []
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
async def _download_image(img_url: str):
async with session.get(img_url) as response:
img_data = await response.read()
with open(f"{DOWNLOAD_DIR}/{str(uuid.uuid4())}.jpg", 'wb') as file:
file.write(img_data)
for img in imgs:
task = asyncio.ensure_future(_download_image(img))
tasks.append(task)
await asyncio.gather(*tasks)
async def parse_html(session, what_to_search: str, pages: int = 1) -> typ.List[str]:
base_url = "https://www.google.com.ua/search?"
imgs = list() # type: typ.List[html.HtmlElement]
start_from = 0 # 0 - first page, 20 - second page, ...
for _ in range(pages):
params = {
"q": what_to_search,
"authuser": "0",
"hl": "uk",
"biw": "963",
"bih": "983",
"ie": "UTF-8",
"tbm": "isch",
"ei": "SMnyXNHKFruBk74Pn_2v4Aw",
"start": str(start_from),
"sa": "N"
}
query = urlparse.urlencode(params)
url = base_url + query
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,"
"*/*;q=0.8,application/signed-exchange;v=b3",
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6",
"referer": "https://www.google.com.ua/",
"upgrade-insecure-requests": "1",
"user-agent": "python-requests/2.22.0"
}
async with session.get(url, headers=headers) as response:
resp = await response.text()
page = html.fromstring(resp) # type: html.HtmlElement
try:
table = page.cssselect("table.images_table")[0] # type: html.HtmlElement
except IndexError:
return []
imgs += table.cssselect("img")
start_from += 20
print("No images found") if not imgs else None
return [img.attrib.get("src") for img in imgs]
def content_type_jpeg() -> typ.List[str]:
pass
async def main():
sm = asyncio.Semaphore(100)
async with ClientSession() as session:
async with sm:
images = await parse_html(session, SEARCH, pages=PAGES)
await download_imgs(images, session)
if __name__ == "__main__":
start = time.time()
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
print("Time:", time.time() - start) # 0.8931670188903809 - 1 page, 3.6354501247406006 - 5 pages
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment