Last active
May 30, 2022 14:38
-
-
Save greyblue9/6ab9a77cf931b16d2c063c24d70b38ea to your computer and use it in GitHub Desktop.
Download images from website using requests, lxml, bs4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import bs4 | |
import sys | |
import time | |
import random | |
import requests | |
import urllib.parse | |
import urllib.request | |
from urllib.error import HTTPError as UrllibHTTPError | |
from requests.exceptions import HTTPError | |
from pathlib import Path | |
site = sys.argv[1:] and sys.argv[1] | |
if not site: | |
print(f'Usage: py {sys.argv[0]} "https://url"') | |
sys.exit(1) | |
urls = [] | |
dls = [] | |
visited_file = Path(".visited.txt") | |
def visited(url): | |
if not visited_file.exists(): | |
if url is None: | |
return set() | |
return False | |
visited = set(visited_file.read_text().splitlines()) | |
if url is None: | |
return visited | |
fullurl = urllib.parse.urljoin(site, url) | |
return url in visited or fullurl in visited | |
def visit(url): | |
if visited(url): | |
return | |
fullurl = urllib.parse.urljoin(site, url) | |
with open(visited_file, "a+") as vf: | |
vf.write(url) | |
vf.write("\x0a") | |
vf.write(fullurl) | |
vf.write("\x0a") | |
robots_text = requests.get( | |
urllib.parse.urljoin(site, "/robots.txt"), | |
verify=False, | |
).text | |
sitemap_urls = [ | |
ln.strip().split("#")[0].split(":", 1)[1] | |
for ln in robots_text.splitlines() | |
if ln.lower().strip().startswith("sitemap:") | |
] | |
if not sitemap_urls: | |
print("Trying well-known sitemap URIs ...") | |
sitemap_urls += [ | |
urllib.parse.urljoin(site, "/sitemap.xml"), | |
urllib.parse.urljoin(site, "/sitemap"), | |
] | |
sitemaps = [] | |
while sitemaps or sitemap_urls: | |
while sitemap_urls: | |
sitemap_url = sitemap_urls.pop(0).strip() | |
print(f"Fetching sitemap {sitemap_url!r} ... ") | |
sitemap = bs4.BeautifulSoup( | |
requests.get(sitemap_url).content, | |
features="lxml-xml" | |
) | |
sitemaps.append(sitemap) | |
while sitemaps: | |
sitemap = sitemaps.pop(0) | |
new_urls = [ | |
l.text.strip() | |
for l in sitemap.select("url loc") | |
] | |
new_map_urls = [ | |
l.text.strip() | |
for l in sitemap.select("sitemap loc") | |
if not visited(l.text.strip()) | |
] | |
print(f" - Found {len(new_map_urls)} new sitemaps") | |
print(f" - Found {len(new_urls)} new urls") | |
urls.extend(new_urls) | |
sitemap_urls.extend(new_map_urls) | |
_urls = list(set(urls).difference(visited(None))) | |
# Before you remove this limit, I ask you to exercise | |
# responsibility in using this tool (or any tool) | |
# and do not either | |
# - scrape a site whose maintainer would not want to | |
# be scraped. | |
# - do anything bad or unwanted with the images you obtain. | |
# - do anything creepy or dishonest or use the | |
# possession of this tool / images to make someone | |
# feel stalked or weird or negatively | |
# - use the images to prank or annoy someone | |
# - be a dick. | |
# Otherwise you absolutely do not deserve to be using | |
# this code and you know what you are doing is wrong. | |
# Be good and responsible so we can all have nice things. | |
urls = (_urls and _urls[:5]) or list() | |
for i, url in enumerate(urls): | |
if visited(url): | |
continue | |
print(f"Fetching URL {i+1} of {len(urls)}: {url!r}") | |
try: | |
resp = requests.get(url, headers={ | |
"Referer": site, | |
"Origin": site, | |
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36", | |
}) | |
if resp.headers["Content-Type"].startswith("image/"): | |
images = [ | |
(None, url, bs4.BeautifulSoup("<img />", "html.parser")), | |
] | |
else: | |
doc = bs4.BeautifulSoup( | |
resp.content, features="lxml-html" | |
) | |
images = [ | |
(i.attrs.get("alt"), src, i) | |
for i in doc.select('img[alt!=""]') | |
if "thumb" not in ( | |
src:=i.attrs.get("data-src") | |
or i.attrs.get("src") | |
).lower() | |
] | |
if not images: | |
images = [ | |
(None, src, i) | |
for i in doc.select( | |
'img:not([src*=".gif"]):not([data-src*=".gif"])' | |
) | |
if "thumb" not in ( | |
src:=i.attrs.get("data-src") | |
or i.attrs.get("src") | |
).lower() | |
] | |
for alt, uri, img in images: | |
if alt: | |
alt = re.compile( | |
"([.](?:[a-z]{3,4}))+$", | |
re.IGNORECASE | |
).subn("", alt)[0] | |
w = int(img.attrs.get("width") or "1000") | |
h = int(img.attrs.get("height") or "1000") | |
if w < 1000 and (w == h or w*1.5 == h or w*2 == h): | |
continue | |
if not uri: | |
continue | |
p = Path(urllib.parse.urlsplit(uri)[2]) | |
if p.exists(): | |
continue | |
name = f"{alt or p.stem}{p.suffix.lower()}" | |
imgurl = "{}://{}{}".format( | |
*urllib.parse.urlsplit( | |
urllib.parse.urljoin(url, uri) | |
)[:3] | |
) | |
if visited(imgurl): | |
continue | |
name = re.subn( | |
r"[^a-zA-Z0-9_(), .-]+", "_", name, re.DOTALL | |
)[0] | |
p2 = Path(name) | |
if p2.exists(): | |
continue | |
print(f" - Downloading {name!r} ", end="") | |
try: | |
r = requests.get( | |
imgurl, | |
headers={ | |
"Referer": url, | |
"Origin": site, | |
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36", | |
} | |
) | |
r.raise_for_status() | |
p2.write_bytes(r.content) | |
with open(Path(name), "rb") as f: | |
magic = f.read(32) | |
ext = ( | |
"jpg" if b"JFIF" in magic else | |
"png" if b"IHDR" in magic else | |
"gif" if ( | |
b"89a" in magic or | |
b"87a" in magic or | |
b"GIF" in magic | |
) else ( | |
r.headers["Content-Type"].split("/", 1)[-1].lower().replace("jpeg", "jpg") | |
) | |
) | |
newname = f"{alt or p.stem}.{ext}" | |
newname = re.subn( | |
r"[^a-zA-Z0-9_(), .-]+", "_", newname, | |
re.DOTALL | |
)[0] | |
if newname != name and Path(name).exists(): | |
if not Path(newname).exists(): | |
print(f"-> {newname!r} [Ok]") | |
Path(name).rename(newname) | |
else: | |
print(" [Already exists]") | |
visit(imgurl) | |
continue | |
else: | |
print("[OK]") | |
dls.append((imgurl, newname)) | |
visit(imgurl) | |
except HTTPError as herr1: | |
print(f" - {herr1=} {': '.join(map(str, herr1.args))}") | |
except HTTPError as herr: | |
print(f" - {herr} {': '.join(map(str, herr.args))}") | |
except Exception: | |
from traceback import print_exc | |
print_exc() | |
break | |
visit(url) | |
wait = random.randint(1, 7) | |
print(f"Wait random period between pages: {wait}s") | |
time.sleep(wait) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment