Skip to content

Instantly share code, notes, and snippets.

@jeremy-rifkin
Last active June 28, 2023 11:58
Show Gist options
  • Save jeremy-rifkin/4a39f6f111f53fc44a10ae2c3f0329cc to your computer and use it in GitHub Desktop.
Save jeremy-rifkin/4a39f6f111f53fc44a10ae2c3f0329cc to your computer and use it in GitHub Desktop.
A script to detect photos with "ifunny" watermarks and filter them from other photos.
# This is a program to filter photos with ifunny watermarks.
# I wrote this script for personal use to separate saved ifunny memes from other photos on my phone.
# The script is fully parallelized and is able to make good use of technologies like Intel
# hyperthreading because substantial program time is spent in IO. Bottleneck will be IO.
#
# Watermark false positive rate appears to be < 1/500
# Watermark false negative rate appears to be ~ 1/500
# False positive / negative rate checked with manual spot check sample size = 500.
#
# Rifkin 2020
import colorsys
from PIL import Image, UnidentifiedImageError
import math
import statistics
import multiprocessing
import os
import shutil
import sys
# photos source and outputs
Photos_Source = "<...>"
Photos_Output = "<...>"
Memes_Output = "<...>"
# prints debug info and runs on different input
DEBUG = False
__STDEBUG = False # Single-threaded debug mode for exceptions in the worker thread
# black color detection parameters
ifunny_black = (23, 22, 27)
black_threshold_narrow = 25 # 15
black_threshold_wide = 60
# yellow detection parameters
# 3 standard deviation threshold
# mean hue = 0.1247 +/- 3 * 0.00988673
ifunny_yellow_mean = 0.1247
ifunny_yellow_stdev = 0.00988673
# watermark size parameters
watermark_size = (135, 20)
watermark_offset = 7
# utilities for finding jpg blocks
def nearest_multiple_bias_up(n, r):
low = n // r
if n % r > 0:
low += 1
return low * r
def nearest_multiple_bias_down(n, r):
return n - n % r
# utilities for color detection
def is_within_black(color, threshold):
for i in range(3):
if abs(color[i] - ifunny_black[i]) > threshold:
return False
return True
def is_within_yellow(hues):
# at *least* 30% of the pixels in the watermark range should have been picked up by the code
# pixels which are too dark are excluded
# in reality much more than 30% should have been picked up but this will just help exclude
# completely irrelevant sections
if len(hues) < watermark_size[0] * watermark_size[1] * 0.3:
return False
if abs(statistics.mean(hues) - ifunny_yellow_mean) > 3 * ifunny_yellow_stdev:
return False
# else
return True
def check_grid(I, Y, X, threshold):
pixels = (Y[1] - Y[0]) * (X[1] - X[0])
defects = 0
for y in range(*Y):
for x in range(*X):
if not is_within_black(I.getpixel((x, y)), threshold):
defects += 1
return (pixels, defects)
# watermark detection main-body
def detect(path):
# the ifunny watermark is 20 pixels tall
# most of the ifunny banner is black
# this 20-tall height should allow at least a couple complete 8x8 jpeg blocks that will have
# nothing other than the ifunny black. these blocks *should* encode the ifunny black perfectly
# in-practice they will mess up the colors a little
# it'll be beneficial to verify the ifunny black as closely as possible in these good block
# the rest of the watermark (impure blocks and logo area) will have to be verified
# more imprecisely using more generous thresholds
try:
I = Image.open(path)
except UnidentifiedImageError:
return False # I guess?
# image color mode check
if I.mode in {"1", "L", "LA", "La", "P", "PA"}:
return False
# size check
width, height = I.size
if width < watermark_size[0] + watermark_offset or height < watermark_size[1]:
if DEBUG: print("return false: too small")
return False
# look at the good blocks within the watermark black bar
# x1 y1 x2 y2
pure = (
0, nearest_multiple_bias_up(height - watermark_size[1], 8),
nearest_multiple_bias_down(width - watermark_size[0] - watermark_offset, 8), height
)
pixels, defects = check_grid(I, (pure[1], pure[3]), (pure[0], pure[2]), black_threshold_narrow)
if defects > pixels * 0.05:
if DEBUG: print("too many defects in pure block : {:.02f}%".format(defects / pixels * 100))
return False
# search area above the pure blocks
pixels, defects = check_grid(I, (height - watermark_size[1], pure[1]), (0, pure[2]), black_threshold_wide)
if defects > pixels * 0.10:
if DEBUG: print("too many defects in top incomplete block : {:.02f}%".format(defects / pixels * 100))
return False
# search area left of the watermark right of the pure blocks
pixels, defects = check_grid(I, (height - watermark_size[1], height), (pure[2], width - watermark_size[0] - watermark_offset), black_threshold_wide)
if defects > pixels * 0.10:
if DEBUG: print("too many defects in logo left incomplete block : {:.02f}%".format(defects / pixels * 100))
return False
# search offset area to the right of the logo
pixels, defects = check_grid(I, (height - watermark_size[1], height), (width - watermark_offset, width), black_threshold_wide)
if defects > pixels * 0.10:
if DEBUG: print("too many defects in logo right incomplete block : {:.02f}%".format(defects / pixels * 100))
return False
# search the logo area
h = []
for y in range(height - watermark_size[1], height):
for x in range(width - watermark_size[0] - watermark_offset, width - watermark_offset):
rgb = I.getpixel((x, y))
hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255)
if hsv[2] < .25 or hsv[1] < .5:
continue
h.append(hsv[0])
if is_within_yellow(h):
return True
# if logo isn't within yellow...
return False
# multiprocessing thread worker
# input: (filename, full_path)
# returns: (full_path, dest, failure)
def worker(job):
f, full_path = job
if os.path.exists(os.path.join(Photos_Output, f)) or os.path.exists(os.path.join(Memes_Output, f)):
return (full_path, "-", 0)
if f.lower().endswith(".jpg") or f.lower().endswith(".png"):
if __STDEBUG:
# same as below but no try-except
if detect(full_path):
shutil.copy(full_path, Memes_Output)
return (full_path, Memes_Output, 0)
else:
try:
if detect(full_path):
shutil.copy(full_path, Memes_Output)
return (full_path, Memes_Output, 0)
except Exception as E:
print("\r\033[K{} ---- unexpected exception".format(job))
print(E)
return (full_path, "-", 1)
# not a jpg / png or no watermark detected
shutil.copy(full_path, Photos_Output)
return (full_path, Photos_Output, 0)
def main():
if DEBUG:
# For debugging I'm running the detector on a few selected false negatives
for path, dirs, files in os.walk("false_negatives"):
for f in files:
full_path = os.path.join(path, f)
print()
print(detect(full_path), full_path)
else:
jobs = []
for path, dirs, files in os.walk(Photos_Source):
for f in files:
full_path = os.path.join(path, f)
jobs.append((f, full_path))
for dest in (Photos_Output, Memes_Output):
if not os.path.exists(dest):
os.makedirs(dest)
if __STDEBUG:
for job in jobs:
print(job)
worker(job)
else:
threads = multiprocessing.cpu_count()
print("processing with {} threads".format(threads))
with multiprocessing.Pool(processes=threads) as pool:
for i, ret in enumerate(pool.imap_unordered(worker, jobs)):
full_path, dest, failure = ret
print("\r\033[K{:.02f}% {:,}/{:,} {}".format(100 * (i + 1) / len(jobs), i + 1, len(jobs), (full_path, dest)), end="")
if failure:
print (" unexpected exception")
sys.exit(1)
if __name__ == "__main__": # windows support for multiprocessing
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment