Created
June 2, 2020 22:09
-
-
Save jeremy-rifkin/5a3f91728028d05b313d05b383adfd5b to your computer and use it in GitHub Desktop.
Filter black and white images from a directory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Filter black and white images from a directory | |
# walk the photos/ directory and copy bw photos | |
# to the bw/ directory. | |
# Runs in parallel to help with analyzing large | |
# batches of photos. | |
import colorsys | |
import multiprocessing | |
import os | |
import shutil | |
import sys | |
import traceback | |
from PIL import Image, ImageFile # pillow is the only external dependency | |
# Let PIL be flexible with corrupted image files | |
ImageFile.LOAD_TRUNCATED_IMAGES = True | |
# get list of files to be processed | |
jobs = [] | |
for root, dirs, files in os.walk("photos"): | |
for file in files: | |
filename, file_extension = os.path.splitext(file) | |
# skip movie files | |
if file_extension.lower() == ".mov" or file_extension.lower() == ".mp4": | |
sys.stderr.write("skipping {}\n".format(file)) | |
continue | |
jobs.append([root, file]) | |
def worker(job): | |
# process job | |
root, file = job | |
try: | |
#print(root + "/" + file) | |
# iterate image pixels and count number of b/w pixels | |
img = Image.open(root + "/" + file) | |
bw_count = 0 | |
for x in range(img.width): | |
for y in range(img.height): | |
p = img.getpixel((x, y)) | |
h, l, s = colorsys.rgb_to_hls(*[c / 255 for c in p[:3]]) | |
# threshold for bw | |
if (l >= .9 or l <= .1) and s <= .2: | |
bw_count += 1 | |
# copy file to output if the image is >=95% black/white | |
if bw_count / (img.width * img.height) >= .95: | |
shutil.copyfile(root + "/" + file, "bw/" + file) | |
except: | |
# the main error this handles is "OSError: cannot identify image file", | |
# which occurs with corrupted image files or non-image files | |
sys.stderr.write("error with processing {}\n".format(file)) | |
traceback.print_exc() | |
return file | |
if __name__ == '__main__': # windows support | |
# setup the pool and start processing jobs | |
print("processing with {} threads".format(multiprocessing.cpu_count())) | |
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: | |
for i, f in enumerate(pool.imap_unordered(worker, jobs)): | |
print("{:.02f}% {}/{} {}\033[K".format(100 * i / len(jobs), i, len(jobs), f), end="\r") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment