Skip to content

Instantly share code, notes, and snippets.

@kagermanov27
Created May 19, 2022 13:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kagermanov27/b3b8efcdc522cf6e193dd4c7bd15f50f to your computer and use it in GitHub Desktop.
Save kagermanov27/b3b8efcdc522cf6e193dd4c7bd15f50f to your computer and use it in GitHub Desktop.
from multiprocessing.dummy import Array
from serpapi import GoogleSearch
from pydantic import BaseModel
import mimetypes
import requests
import json
import os
class Query(BaseModel):
google_domain: str = "google.com"
num: str = "100"
ijn: str = "0"
q: str
api_key: str ## You may replace this with `api_key: str = "Your API Key"`
class Download:
def __init__(self, query):
self.query = query
self.results = []
self.previous_results = []
self.unique_results = []
self.new_results = []
def serpapi_search(self):
params = {
"engine": "google",
"ijn": self.query.ijn,
"q": self.query.q,
"google_domain": self.query.google_domain,
"tbm": "isch",
"num": self.query.num,
"api_key": self.query.api_key
}
search = GoogleSearch(params)
results = search.get_dict()
results = results['images_results']
self.results = results = [x['original'] for x in results]
def separate_unique_results(self):
f = open('datasets/previous_images.json')
previous_results = json.load(f)
self.previous_results = previous_results['previous']
self.unique_results = list(set(self.results).difference(set(self.previous_results)))
def download(self, link):
print("Downloading {}".format(link))
## Find Latest File Name
if self.query.q in os.listdir("datasets/test"):
file_names = os.listdir("datasets/test/{}".format(self.query.q))
else:
os.mkdir("datasets/test/{}".format(self.query.q))
file_names = os.listdir("datasets/test/{}".format(self.query.q))
if len(file_names) > 0:
file_names = [int(x.split('.')[0]) for x in file_names]
latest_file_name = str(max(file_names) + 1)
else:
latest_file_name = "0"
## Download and guess the file format
r = requests.get(link)
extension = mimetypes.guess_extension(r.headers.get('content-type', '').split(';')[0])
if not extension == ".html":
with open("datasets/test/{}/{}{}".format(self.query.q, latest_file_name, extension or '.jpg'), 'wb') as f:
f.write(r.content)
def update_downloaded_images(self):
new_results = list(set(self.unique_results + self.previous_results))
self.new_results = {"previous": new_results}
with open('datasets/previous_images.json', 'w') as f:
json.dump(self.new_results, f)
def download_all_images(self):
self.serpapi_search()
self.separate_unique_results()
for link in self.unique_results:
try:
self.download(link)
except:
print("Passed {}".format(link))
self.update_downloaded_images()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment