-
-
Save kagermanov27/b3b8efcdc522cf6e193dd4c7bd15f50f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.dummy import Array | |
from serpapi import GoogleSearch | |
from pydantic import BaseModel | |
import mimetypes | |
import requests | |
import json | |
import os | |
class Query(BaseModel): | |
google_domain: str = "google.com" | |
num: str = "100" | |
ijn: str = "0" | |
q: str | |
api_key: str ## You may replace this with `api_key: str = "Your API Key"` | |
class Download: | |
def __init__(self, query): | |
self.query = query | |
self.results = [] | |
self.previous_results = [] | |
self.unique_results = [] | |
self.new_results = [] | |
def serpapi_search(self): | |
params = { | |
"engine": "google", | |
"ijn": self.query.ijn, | |
"q": self.query.q, | |
"google_domain": self.query.google_domain, | |
"tbm": "isch", | |
"num": self.query.num, | |
"api_key": self.query.api_key | |
} | |
search = GoogleSearch(params) | |
results = search.get_dict() | |
results = results['images_results'] | |
self.results = results = [x['original'] for x in results] | |
def separate_unique_results(self): | |
f = open('datasets/previous_images.json') | |
previous_results = json.load(f) | |
self.previous_results = previous_results['previous'] | |
self.unique_results = list(set(self.results).difference(set(self.previous_results))) | |
def download(self, link): | |
print("Downloading {}".format(link)) | |
## Find Latest File Name | |
if self.query.q in os.listdir("datasets/test"): | |
file_names = os.listdir("datasets/test/{}".format(self.query.q)) | |
else: | |
os.mkdir("datasets/test/{}".format(self.query.q)) | |
file_names = os.listdir("datasets/test/{}".format(self.query.q)) | |
if len(file_names) > 0: | |
file_names = [int(x.split('.')[0]) for x in file_names] | |
latest_file_name = str(max(file_names) + 1) | |
else: | |
latest_file_name = "0" | |
## Download and guess the file format | |
r = requests.get(link) | |
extension = mimetypes.guess_extension(r.headers.get('content-type', '').split(';')[0]) | |
if not extension == ".html": | |
with open("datasets/test/{}/{}{}".format(self.query.q, latest_file_name, extension or '.jpg'), 'wb') as f: | |
f.write(r.content) | |
def update_downloaded_images(self): | |
new_results = list(set(self.unique_results + self.previous_results)) | |
self.new_results = {"previous": new_results} | |
with open('datasets/previous_images.json', 'w') as f: | |
json.dump(self.new_results, f) | |
def download_all_images(self): | |
self.serpapi_search() | |
self.separate_unique_results() | |
for link in self.unique_results: | |
try: | |
self.download(link) | |
except: | |
print("Passed {}".format(link)) | |
self.update_downloaded_images() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment