Last active
June 28, 2023 11:58
-
-
Save jeremy-rifkin/4a39f6f111f53fc44a10ae2c3f0329cc to your computer and use it in GitHub Desktop.
A script to detect photos with "ifunny" watermarks and filter them from other photos.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a program to filter photos with ifunny watermarks. | |
# I wrote this script for personal use to separate saved ifunny memes from other photos on my phone. | |
# The script is fully parallelized and is able to make good use of technologies like Intel | |
# hyperthreading because substantial program time is spent in IO. Bottleneck will be IO. | |
# | |
# Watermark false positive rate appears to be < 1/500 | |
# Watermark false negative rate appears to be ~ 1/500 | |
# False positive / negative rate checked with manual spot check sample size = 500. | |
# | |
# Rifkin 2020 | |
import colorsys | |
from PIL import Image, UnidentifiedImageError | |
import math | |
import statistics | |
import multiprocessing | |
import os | |
import shutil | |
import sys | |
# photos source and outputs | |
Photos_Source = "<...>" | |
Photos_Output = "<...>" | |
Memes_Output = "<...>" | |
# prints debug info and runs on different input | |
DEBUG = False | |
__STDEBUG = False # Single-threaded debug mode for exceptions in the worker thread | |
# black color detection parameters | |
ifunny_black = (23, 22, 27) | |
black_threshold_narrow = 25 # 15 | |
black_threshold_wide = 60 | |
# yellow detection parameters | |
# 3 standard deviation threshold | |
# mean hue = 0.1247 +/- 3 * 0.00988673 | |
ifunny_yellow_mean = 0.1247 | |
ifunny_yellow_stdev = 0.00988673 | |
# watermark size parameters | |
watermark_size = (135, 20) | |
watermark_offset = 7 | |
# utilities for finding jpg blocks | |
def nearest_multiple_bias_up(n, r): | |
low = n // r | |
if n % r > 0: | |
low += 1 | |
return low * r | |
def nearest_multiple_bias_down(n, r): | |
return n - n % r | |
# utilities for color detection | |
def is_within_black(color, threshold): | |
for i in range(3): | |
if abs(color[i] - ifunny_black[i]) > threshold: | |
return False | |
return True | |
def is_within_yellow(hues): | |
# at *least* 30% of the pixels in the watermark range should have been picked up by the code | |
# pixels which are too dark are excluded | |
# in reality much more than 30% should have been picked up but this will just help exclude | |
# completely irrelevant sections | |
if len(hues) < watermark_size[0] * watermark_size[1] * 0.3: | |
return False | |
if abs(statistics.mean(hues) - ifunny_yellow_mean) > 3 * ifunny_yellow_stdev: | |
return False | |
# else | |
return True | |
def check_grid(I, Y, X, threshold): | |
pixels = (Y[1] - Y[0]) * (X[1] - X[0]) | |
defects = 0 | |
for y in range(*Y): | |
for x in range(*X): | |
if not is_within_black(I.getpixel((x, y)), threshold): | |
defects += 1 | |
return (pixels, defects) | |
# watermark detection main-body | |
def detect(path): | |
# the ifunny watermark is 20 pixels tall | |
# most of the ifunny banner is black | |
# this 20-tall height should allow at least a couple complete 8x8 jpeg blocks that will have | |
# nothing other than the ifunny black. these blocks *should* encode the ifunny black perfectly | |
# in-practice they will mess up the colors a little | |
# it'll be beneficial to verify the ifunny black as closely as possible in these good block | |
# the rest of the watermark (impure blocks and logo area) will have to be verified | |
# more imprecisely using more generous thresholds | |
try: | |
I = Image.open(path) | |
except UnidentifiedImageError: | |
return False # I guess? | |
# image color mode check | |
if I.mode in {"1", "L", "LA", "La", "P", "PA"}: | |
return False | |
# size check | |
width, height = I.size | |
if width < watermark_size[0] + watermark_offset or height < watermark_size[1]: | |
if DEBUG: print("return false: too small") | |
return False | |
# look at the good blocks within the watermark black bar | |
# x1 y1 x2 y2 | |
pure = ( | |
0, nearest_multiple_bias_up(height - watermark_size[1], 8), | |
nearest_multiple_bias_down(width - watermark_size[0] - watermark_offset, 8), height | |
) | |
pixels, defects = check_grid(I, (pure[1], pure[3]), (pure[0], pure[2]), black_threshold_narrow) | |
if defects > pixels * 0.05: | |
if DEBUG: print("too many defects in pure block : {:.02f}%".format(defects / pixels * 100)) | |
return False | |
# search area above the pure blocks | |
pixels, defects = check_grid(I, (height - watermark_size[1], pure[1]), (0, pure[2]), black_threshold_wide) | |
if defects > pixels * 0.10: | |
if DEBUG: print("too many defects in top incomplete block : {:.02f}%".format(defects / pixels * 100)) | |
return False | |
# search area left of the watermark right of the pure blocks | |
pixels, defects = check_grid(I, (height - watermark_size[1], height), (pure[2], width - watermark_size[0] - watermark_offset), black_threshold_wide) | |
if defects > pixels * 0.10: | |
if DEBUG: print("too many defects in logo left incomplete block : {:.02f}%".format(defects / pixels * 100)) | |
return False | |
# search offset area to the right of the logo | |
pixels, defects = check_grid(I, (height - watermark_size[1], height), (width - watermark_offset, width), black_threshold_wide) | |
if defects > pixels * 0.10: | |
if DEBUG: print("too many defects in logo right incomplete block : {:.02f}%".format(defects / pixels * 100)) | |
return False | |
# search the logo area | |
h = [] | |
for y in range(height - watermark_size[1], height): | |
for x in range(width - watermark_size[0] - watermark_offset, width - watermark_offset): | |
rgb = I.getpixel((x, y)) | |
hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) | |
if hsv[2] < .25 or hsv[1] < .5: | |
continue | |
h.append(hsv[0]) | |
if is_within_yellow(h): | |
return True | |
# if logo isn't within yellow... | |
return False | |
# multiprocessing thread worker | |
# input: (filename, full_path) | |
# returns: (full_path, dest, failure) | |
def worker(job): | |
f, full_path = job | |
if os.path.exists(os.path.join(Photos_Output, f)) or os.path.exists(os.path.join(Memes_Output, f)): | |
return (full_path, "-", 0) | |
if f.lower().endswith(".jpg") or f.lower().endswith(".png"): | |
if __STDEBUG: | |
# same as below but no try-except | |
if detect(full_path): | |
shutil.copy(full_path, Memes_Output) | |
return (full_path, Memes_Output, 0) | |
else: | |
try: | |
if detect(full_path): | |
shutil.copy(full_path, Memes_Output) | |
return (full_path, Memes_Output, 0) | |
except Exception as E: | |
print("\r\033[K{} ---- unexpected exception".format(job)) | |
print(E) | |
return (full_path, "-", 1) | |
# not a jpg / png or no watermark detected | |
shutil.copy(full_path, Photos_Output) | |
return (full_path, Photos_Output, 0) | |
def main(): | |
if DEBUG: | |
# For debugging I'm running the detector on a few selected false negatives | |
for path, dirs, files in os.walk("false_negatives"): | |
for f in files: | |
full_path = os.path.join(path, f) | |
print() | |
print(detect(full_path), full_path) | |
else: | |
jobs = [] | |
for path, dirs, files in os.walk(Photos_Source): | |
for f in files: | |
full_path = os.path.join(path, f) | |
jobs.append((f, full_path)) | |
for dest in (Photos_Output, Memes_Output): | |
if not os.path.exists(dest): | |
os.makedirs(dest) | |
if __STDEBUG: | |
for job in jobs: | |
print(job) | |
worker(job) | |
else: | |
threads = multiprocessing.cpu_count() | |
print("processing with {} threads".format(threads)) | |
with multiprocessing.Pool(processes=threads) as pool: | |
for i, ret in enumerate(pool.imap_unordered(worker, jobs)): | |
full_path, dest, failure = ret | |
print("\r\033[K{:.02f}% {:,}/{:,} {}".format(100 * (i + 1) / len(jobs), i + 1, len(jobs), (full_path, dest)), end="") | |
if failure: | |
print (" unexpected exception") | |
sys.exit(1) | |
if __name__ == "__main__": # windows support for multiprocessing | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment