Skip to content

Instantly share code, notes, and snippets.

@jeremy-rifkin
Created June 2, 2020 22:09
Show Gist options
  • Save jeremy-rifkin/5a3f91728028d05b313d05b383adfd5b to your computer and use it in GitHub Desktop.
Save jeremy-rifkin/5a3f91728028d05b313d05b383adfd5b to your computer and use it in GitHub Desktop.
Filter black and white images from a directory
# Filter black and white images from a directory
# walk the photos/ directory and copy bw photos
# to the bw/ directory.
# Runs in parallel to help with analyzing large
# batches of photos.
import colorsys
import multiprocessing
import os
import shutil
import sys
import traceback
from PIL import Image, ImageFile # pillow is the only external dependency
# Let PIL be flexible with corrupted image files
ImageFile.LOAD_TRUNCATED_IMAGES = True
# get list of files to be processed
jobs = []
for root, dirs, files in os.walk("photos"):
for file in files:
filename, file_extension = os.path.splitext(file)
# skip movie files
if file_extension.lower() == ".mov" or file_extension.lower() == ".mp4":
sys.stderr.write("skipping {}\n".format(file))
continue
jobs.append([root, file])
def worker(job):
# process job
root, file = job
try:
#print(root + "/" + file)
# iterate image pixels and count number of b/w pixels
img = Image.open(root + "/" + file)
bw_count = 0
for x in range(img.width):
for y in range(img.height):
p = img.getpixel((x, y))
h, l, s = colorsys.rgb_to_hls(*[c / 255 for c in p[:3]])
# threshold for bw
if (l >= .9 or l <= .1) and s <= .2:
bw_count += 1
# copy file to output if the image is >=95% black/white
if bw_count / (img.width * img.height) >= .95:
shutil.copyfile(root + "/" + file, "bw/" + file)
except:
# the main error this handles is "OSError: cannot identify image file",
# which occurs with corrupted image files or non-image files
sys.stderr.write("error with processing {}\n".format(file))
traceback.print_exc()
return file
if __name__ == '__main__': # windows support
# setup the pool and start processing jobs
print("processing with {} threads".format(multiprocessing.cpu_count()))
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
for i, f in enumerate(pool.imap_unordered(worker, jobs)):
print("{:.02f}% {}/{} {}\033[K".format(100 * i / len(jobs), i, len(jobs), f), end="\r")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment