Skip to content

Instantly share code, notes, and snippets.

@McFlat
Created September 27, 2019 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save McFlat/017c8ed308f78cae7512059930e118b4 to your computer and use it in GitHub Desktop.
Save McFlat/017c8ed308f78cae7512059930e118b4 to your computer and use it in GitHub Desktop.
Google images downloader
#!/usr/bin/env python3
from __future__ import print_function
IMPORTED = False
import os
import sys
import json
import argparse
import string
import concurrent.futures
import multiprocessing
from urllib import parse, request
from hashlib import md5
import subprocess
try:
import requests
from bs4 import BeautifulSoup
IMPORTED = True
except ImportError as e:
print('\033[31m@ Error: %s \033[0m' % str(e))
print('pip3 install requests bs4')
pip3output = subprocess.check_output(
'pip3 install requests bs4',
stderr=subprocess.STDOUT,
shell=True
)
print(pip3output)
try:
import requests
from bs4 import BeautifulSoup
IMPORTED = True
except ImportError as e:
print('\033[31m@ Error: %s \033[0m' % str(e))
exit(1)
GOOGLE_IMAGES_URL = 'https://www.google.com/search?q={query}&source=lnms&tbm=isch'
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36'
class DownloaderException(BaseException):
pass
class Filesystem(object):
@staticmethod
def makedirs(directory):
if not os.path.exists(directory):
os.makedirs(directory)
@staticmethod
def get_queries_from_pictures_directory(directory):
queries = set([])
if os.path.exists(directory):
for f in os.scandir(directory):
if f.is_dir():
queries.add(os.path.basename(f.path))
return list(queries)
class Downloader(object):
@staticmethod
def download_webpage(query):
session = requests.Session()
session.headers['User-Agent'] = USER_AGENT
# Get Google Images webpage
response = session.get(GOOGLE_IMAGES_URL.format(query=query))
return BeautifulSoup(response.text, 'html.parser')
@staticmethod
def extract_images(soup, directory, query):
images = [] # contains the link for Large original images, type of image
for a in soup.find_all("div", {"class": "rg_meta"}):
data = json.loads(a.text)
# original_url = data["ou"] # original_url
# original_width = data["ow"] # original width
# original_height = data["oh"] # original height
# thumb_url = data["tu"] # thumb url
# thumb_width = data["tw"] # thumb width
# thumb_height = data["th"] # thumb height
# image_type = data["ity"] # image type
# resource_url = data["ru"] # resource url
# page_title = data["pt"] # page title
p = os.path.splitext(
parse.unquote(os.path.basename(parse.urlparse(data['ou']).path.strip('/')))
)
if not p[1] or p[1] not in ['.jpg', '.png', '.gif', '.jpeg', '.bmp']:
filename = ''.join(list([
p[0]
.replace('+', '-')
.replace('--', '-')
.replace('--', '-')
.replace('__', '_')
.replace('__', '_'),
'.jpg'
]))
if len(filename) > 250:
filename = filename[:250]
path = os.path.join(directory, filename)
else:
filename = ''.join(list([
p[0]
.replace('+', '-')
.replace('--', '-')
.replace('--', '-')
.replace('__', '_')
.replace('__', '_'),
p[1]
]))
if len(filename) > 210:
filename = filename[:210]
path = os.path.join(directory, filename)
path = ''.join(filter(lambda x: x in string.printable, path))
images.append({
'url': data['ou'],
'path': path,
'type': data['ity'],
'query': query
})
return images
@staticmethod
def download_images(timeout=180):
def inner_di(arg):
try:
req = request.Request(arg['url'])
req.add_header('User-Agent', USER_AGENT)
result = request.urlopen(req, timeout=timeout)
if result.status == 200:
res = result.read()
if res:
p = os.path.splitext(arg['path'])
arg['path'] = '%s-%s%s' % (p[0], md5(res).hexdigest(), p[1])
print(arg['path'])
f = open(arg['path'], 'wb')
f.write(res)
f.close()
except (Exception, KeyboardInterrupt) as e:
print('\033[31mError:\033[0m q:%s - p:%s - u:%s - e:%s' % (arg['query'], arg['path'], arg['url'], e))
return None
try:
return inner_di
except (Exception, KeyboardInterrupt) as e:
pass
class Runner(object):
@staticmethod
def run_io_bound(items, callback):
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=int(multiprocessing.cpu_count())) as ex:
future_to_item = {ex.submit(callback, i): i for i in items}
for future in concurrent.futures.as_completed(future_to_item):
item = future_to_item[future]
try:
res = future.result()
except Exception as exc:
# print('%r generated an exception: %s' % (item, exc))
raise exc
finally:
pass # print(res)
except KeyboardInterrupt:
pass
@staticmethod
def process_pictures_query(timeout):
def inner_ppq(args):
query = string.capwords(args['query'])
pictures_directory = args['pictures_directory']
parent_directory = args['parent_directory']
download_images_timeout = args['timeout']
if pictures_directory is not '':
directory = pictures_directory
else:
directory = os.path.join(parent_directory, query)
Filesystem.makedirs(directory)
html = Downloader.download_webpage(query.replace(' ', '+'))
images = Downloader.extract_images(html, directory, query)
Runner.run_io_bound(images, Downloader.download_images(download_images_timeout))
return inner_ppq
def main(argv):
parser = argparse.ArgumentParser(
add_help=False, description=('Download Google Images')
)
parser.add_argument(
'--help', '-h', action='help', default=argparse.SUPPRESS, help='Show this help message and exit'
)
parser.add_argument(
'--query', '-q', help='Search queries', action='append'
)
parser.add_argument(
'--parent-directory', '-P', help='Parent directory; default: "./Pictures"', default='./Pictures'
)
parser.add_argument(
'--pictures-directory', '-p', help='Pictures directory, override "parent-directory + query" path', default=''
)
parser.add_argument(
'--timeout', '-t', help='Timeout', default=180, type=int
)
parser.add_argument(
'--refresh', '-r', help='Redownload images for all query directories in parent-directory' dest='refresh', action='store_true', default=False
)
operations = []
def add_operation(args, q):
data = {
'parent_directory': args.parent_directory,
'pictures_directory': args.pictures_directory,
'timeout': args.timeout,
'query': q
}
operations.append(data)
try:
args = parser.parse_args(argv)
if not args.refresh:
print(args.query)
for q in args.query:
add_operation(args, q)
Runner.run_io_bound(operations, Runner.process_pictures_query(args.timeout))
else:
queries = Filesystem.get_queries_from_pictures_directory(args.parent_directory)
print(queries)
for q in queries:
add_operation(args, q)
Runner.run_io_bound(operations, Runner.process_pictures_query(args.timeout))
except Exception as e:
print('\033[31m@ Error: %s \033[0m' % str(e))
sys.exit(1)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment