Skip to content

Instantly share code, notes, and snippets.

@dahlia
Created December 27, 2010 12:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dahlia/756091 to your computer and use it in GitHub Desktop.
Save dahlia/756091 to your computer and use it in GitHub Desktop.
Magic Image Crawler!
#!/usr/bin/env python
""":mod:`magic_image_crawler` --- Magic Image Crawler!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This small program crawls the URL and download only most important images of
the page. Trivial images like buttons, banners are ignored.
"""
import re
import itertools
import urlparse
import urllib2
import tempfile
import os.path
import lxml.etree
import lxml.html
import Image
import ImageChops
#: The :class:`~lxml.etree.XPath` that finds image URLs.
IMG_SRC_XPATH = lxml.etree.XPath("//img[@src]/@src")
#: The default minimum width/height.
DEFAULT_MINAREA = 200 * 250
#: The default tolerence for treating background pixels.
DEFAULT_BGTOLERANCE = 10
def image_urls(url, root_url=None):
"""Finds only important image URLs from the given ``url``.
:param url: the URL of the page to find
:type url: :class:`basestring`
:param root_url: an optional URL of the home
:type root_url: :class:`basestring`
:returns: a list of found image URLs
:rtype: :class:`list`
"""
doc = lxml.html.parse(url)
charset = doc.xpath("//meta[@charset]/@charset")
if charset:
charset = charset[0]
else:
ns = lxml.etree.FunctionNamespace(None)
ns["lower-case"] = lambda dummy, seq: seq[0].lower() if seq else None
charset = doc.xpath("//meta[lower-case(@http-equiv)='content-type']"
"/@content")
if charset:
m = re.search(r";\s*charset\s*=\s*(\S+)\s*", charset[0], re.I)
charset = m.group(1) if m else "utf-8"
else:
charset = "utf-8"
images = (urlparse.urljoin(url, src) for src in IMG_SRC_XPATH(doc))
# duplicated images are probably layout elements like buttons.
images = (k for k, v in itertools.groupby(images) if len(tuple(v)) < 2)
root_url = root_url or urlparse.urljoin(url, "/")
root_doc = lxml.html.parse(root_url)
root_images = [urlparse.urljoin(root_url, src)
for src in IMG_SRC_XPATH(root_doc)]
return [img.encode(charset) if isinstance(img, unicode) else img
for img in images if img not in root_images]
def download_urls(urls):
"""Downloads files of URLs. This is a generator function that yields
file objects.
:param urls: a list of URLs to download
:type urls: iterable object
:returns: a list of downloaded files
:rtype: iterable object
"""
for url in urls:
u = urllib2.urlopen(url)
f = tempfile.SpooledTemporaryFile()
while True:
buffer = u.read(0x1000)
if buffer:
f.write(buffer)
else:
break
f.seek(0)
u.close()
yield f
def split_image(image, minarea=DEFAULT_MINAREA,
bgtolerance=DEFAULT_BGTOLERANCE, gap=1, recurse=3):
"""Does zealous crop and splits the given ``image`` into several small
images.
:param image: a PIL image to split
:param minarea: a minimum width * height size for filtering.
default is :const:`DEFAULT_MINAREA`
:type minarea: :class:`int`, :class:`long`
:param bgtolerance: a tolerance for treating background pixels.
default is :const:`DEFAULT_BGTOLERANCE`
:type bgtolerence: :class:`int`
:param gap: a minmum pixel of gap. default is 1
:type gap: :class:`int`
:param recurse: a maximum recursion depth. default is 3
:type recurse: :class:`int`
:returns: a list of split images
:rtype: iterable object
"""
if not Image.isImageType(image):
raise TypeError("image must be a PIL image object, not " + repr(image))
if gap < 1:
raise TypeError("gap must be greater than 0")
if image.mode != "RGB":
image = image.convert("RGB")
bgcolor = 255, 255, 255
def _isbg(color):
return sum(abs(a - b) for a, b in zip(color, bgcolor)) <= bgtolerance
if bgtolerance < 1:
_isbg = lambda color: color == bgcolor
bg = Image.new("RGB", image.size, bgcolor)
diff = ImageChops.difference(image, bg)
image = image.crop(diff.getbbox())
pixels = image.load()
width, height = image.size
empty = 0
top = 0
for y in xrange(height):
if all(_isbg(pixels[x, y]) for x in xrange(width)):
empty += 1
elif empty >= gap:
if width * (y - empty - top) >= minarea:
inner_image = image.crop((0, top, width, y - empty))
if recurse > 0:
inner_image = inner_image.rotate(90)
_imgs = split_image(inner_image,
minarea=minarea,
bgtolerance=bgtolerance,
recurse=recurse - 1)
for _i in _imgs:
yield _i.rotate(270)
else:
yield inner_image
empty = 0
top = y
if empty < gap and width * (height - top) >= minarea:
inner_image = image.crop((0, top, width, height))
if recurse:
inner_image = inner_image.rotate(90)
_imgs = split_image(inner_image,
minarea=minarea,
bgtolerance=bgtolerance,
recurse=recurse - 1)
for _i in _imgs:
yield _i.rotate(270)
else:
yield inner_image
def magic(url, root_url=None, save_path=".",
minarea=DEFAULT_MINAREA, bgtolerance=DEFAULT_BGTOLERANCE):
"""Spells and does magic!
:param url: the URL to crawl
:type url: :class:`basestring`
:param root_url: an optional URL of the related page or home page
:type root_url: :class:`basestring`
:param save_path: a path to save images
:type save_path: :class:`basestring`
:param minarea: a minimum image area for filtering.
default is :const:`DEFAULT_MINAREA`
:type minarea: :class:`int`, :class:`long`
:param bgtolerance: a tolerance for treating background pixels.
default is :const:`DEFAULT_BGTOLERANCE`
:type bgtolerence: :class:`int`
:returns: a generator that saves images and yields paths
:rtype: iterable object
"""
parsed_url = urlparse.urlparse(url)
files = download_urls(image_urls(url, root_url))
def _images():
for file in files:
try:
im = Image.open(file)
except IOError:
continue
if im.size[0] * im.size[1] >= minarea:
yield split_image(im, minarea=minarea, bgtolerance=bgtolerance)
m = re.match(r"^https?://[^/]+/(.*)$", url, re.IGNORECASE)
if m:
key = "-".join(d.group(0) for d in re.finditer(r"\d+", m.group(1))) \
+ "-"
else:
key = ""
fmt = ("{0}-" + key + "{1:03d}.png").format
n = 1
for image_set in _images():
for im in image_set:
while True:
path = os.path.join(save_path, fmt(parsed_url.hostname, n))
if os.path.isfile(path):
n += 1
else:
break
im.save(path)
n += 1
yield path
def _download(kwargs):
result = magic(**kwargs)
for path in result:
print path
return kwargs["url"]
def unique_everseen(iterable, key=None):
"""List unique elements, preserving order. Remember all elements ever
seen.
.. sourcecode:: pycon
>>> list(unique_everseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D']
>>> unique_everseen('ABBCcAD', str.lower)
['A', 'B', 'C', 'D']
.. note:: Copied from :mod:`itertools` recipes.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in itertools.ifilterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def main():
import optparse
import os
import multiprocessing
multiprocessing.freeze_support()
parser = optparse.OptionParser(usage="%prog [options] URL...")
parser.add_option("-r", "--root-url", metavar="URL", default=None,
help="an optional URL of the related page or home page.")
parser.add_option("-d", "--save-path", metavar="DIR", default=".",
help="a DIR path to save images. [%default]")
parser.add_option("-a", "--min-area",
type="int", metavar="AREA", default=DEFAULT_MINAREA,
help="a minimum image area for filtering. [%default]")
parser.add_option("-t", "--bgtolerance", "--background-tolerance",
type="int", metavar="TOLERANCE",
default=DEFAULT_BGTOLERANCE,
help="a TOLERANCE for treating background pixels. "
"[%default]")
parser.add_option("-w", "--workers", type="int", metavar="NUM", default=3,
help="the number of workers [%default]")
options, urls = parser.parse_args()
if not urls:
parser.error("required one or more URLs to crawl.")
urls = itertools.chain.from_iterable(url.split() for url in urls)
urls = list(unique_everseen(urls)) # remove duplicates
if not os.path.isdir(options.save_path):
os.makedirs(options.save_path)
args = [("root_url", options.root_url),
("save_path", options.save_path),
("minarea", options.min_area),
("bgtolerance", options.bgtolerance)]
pool_size = min(len(urls), options.workers)
if pool_size < 2:
args = dict(args)
for i, url in enumerate(urls):
if "root_url" not in args or not args["root_url"]:
args["root_url"] = urls[0 if i > 0 else 1]
_args = dict(args)
_args["url"] = url
_download(_args)
print "[complete]", url
return
pool = multiprocessing.Pool(pool_size)
argslist = []
for i, url in enumerate(urls):
_args = dict(args)
_args["url"] = url
if "root_url" not in _args or not _args["root_url"]:
_args["root_url"] = urls[0 if i > 0 else 1]
argslist.append(_args)
result = pool.imap_unordered(_download, argslist)
for url in result:
print "[complete]", url
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment