Skip to content

Instantly share code, notes, and snippets.

@paul-schwendenman
Created August 21, 2015 15:27
Show Gist options
  • Save paul-schwendenman/ed727ac16f1b24277b36 to your computer and use it in GitHub Desktop.
Save paul-schwendenman/ed727ac16f1b24277b36 to your computer and use it in GitHub Desktop.
Image Fingerprinting project
'''Tool for analyzing images'''
from __future__ import print_function
from PIL import Image
import imagehash
import argparse
import shelve
import glob
import glob2
import uuid
import random
import shutil
import os
import collections
import subprocess
import math
import numpy
def index(args):
'''Process and index a dataset for futher inspection'''
# open the shelve database
db = shelve.open(args["shelve"], writeback=True)
db['dhash'] = db.get('dhash', {})
db['phash'] = db.get('phash', {})
db['ahash'] = db.get('ahash', {})
# loop over the image dataset
for imagePath in glob.iglob(os.path.join(args['dataset'], '*.jpg')):
# load the image and compute the difference hash
image = Image.open(imagePath)
dhash = str(imagehash.dhash(image, 64))
phash = str(imagehash.phash(image, 64))
ahash = str(imagehash.average_hash(image, 64))
# extract the filename from the path and update the database
# using the hash as the key and the filename append to the
# list of values
filename = imagePath[imagePath.rfind("/") + 1:]
db['dhash'][dhash] = db['dhash'].get(dhash, []) + [filename]
db['phash'][phash] = db['phash'].get(phash, []) + [filename]
db['ahash'][ahash] = db['ahash'].get(ahash, []) + [filename]
# close the shelf database
db.close()
def hex_to_hash(hexstr):
"""
Convert a stored hash (hex, as retrieved from str(Imagehash))
back to a Imagehash object.
"""
l = []
hash_size = int(math.sqrt(len(hexstr) * 4))
if hash_size ** 2 / 4 != len(hexstr):
print(hash_size ** 2 / 4, len(hexstr))
raise ValueError('The hex string has the wrong length')
for i in range(hash_size):
h = hexstr[i*2:i*2+2]
v = int("0x" + h, 16)
l.append([v & 2**i > 0 for i in range(hash_size)])
return imagehash.ImageHash(numpy.array(l))
def supersearch(args):
'''Search the database for similar files'''
# open the shelve database
db = shelve.open(args["shelve"])
query = Image.open(args["query"])
if args['hash_name'] == 'dhash':
h = imagehash.dhash(query, 64)
db_hash = db['dhash']
elif args['hash_name'] == 'phash':
h = imagehash.phash(query, 64)
db_hash = db['phash']
elif args['hash_name'] == 'ahash':
h = imagehash.average_hash(query, 64)
db_hash = db['ahash']
print(collections.Counter(len(hex) for hex, image in db_hash.items()).most_common())
l = [(h - hex_to_hash(hex), hex) for hex, image in db_hash.items()]
c = collections.Counter(item[0] for item in l)
print(sorted(c.most_common(), key=lambda item: item[0]))
command = ['feh', '-F']
for strength, item in sorted(l, key=lambda item: item[0]):
if strength <= args['threshold']:
print('{} count: {} stength: {}'.format(db_hash[item][0], len(db_hash[item]), strength))
command.append(os.path.join(args["dataset"], db_hash[item][0]))
if len(command) > 2:
subprocess.call(command)
def search(args):
'''Search the database for matching files'''
# open the shelve database
db = shelve.open(args["shelve"])
# load the query image, compute the difference image hash, and
# and grab the images from the database that have the same hash
# value
query = Image.open(args["query"])
h = str(imagehash.dhash(query))
filenames = db[h]
print("Found %d images" % (len(filenames)))
# loop over the images
for filename in filenames:
print(os.path.join(args["dataset"], filename))
#image = Image.open(args["dataset"] + "/" + filename)
#image.show()
# close the shelve database
db.close()
def gather(args):
'''Grab subset of files from CALTECH or other dataset'''
# open the output file for writing
output = open(args["csv"], "w")
# loop over the input images
for imagePath in glob2.iglob(os.path.join(args["input"], "*/*.jpg")):
# generate a random filename for the image and copy it to
# the output location
filename = str(uuid.uuid4()) + ".jpg"
shutil.copy(imagePath, os.path.join(args["output"], filename))
# there is a 1 in 500 chance that multiple copies of this
# image will be used
if random.randint(0, 500) == 0:
# initialize the number of times the image is being
# duplicated and write it to the output CSV file
numTimes = random.randint(1, 8)
output.write("%s,%d\n" % (filename, numTimes))
# loop over a random number of times for this image to
# be duplicated
for i in xrange(0, numTimes):
image = Image.open(imagePath)
# randomly resize the image, perserving aspect ratio
factor = random.uniform(0.95, 1.05)
width = int(image.size[0] * factor)
ratio = width / float(image.size[0])
height = int(image.size[1] * ratio)
image = image.resize((width, height), Image.ANTIALIAS)
# generate a random filename for the image and copy
# it to the output directory
adjFilename = str(uuid.uuid4()) + ".jpg"
shutil.copy(imagePath, os.path.join(args["output"], adjFilename))
# close the output file
output.close()
def main():
'''Main function'''
# construct the argument parse and parse the arguments
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="subparser_name")
index_parser = subparsers.add_parser('index')
index_parser.add_argument("-d", "--dataset",
required=True,
help="path to input dataset of images")
index_parser.add_argument("-s", "--shelve",
required=True,
help="output shelve database")
# construct the argument parse and parse the arguments
search_parser = subparsers.add_parser('search')
search_parser.add_argument("-d", "--dataset",
required=True,
help="path to dataset of images")
search_parser.add_argument("-s", "--shelve",
required=True,
help="output shelve database")
search_parser.add_argument("-q", "--query",
required=True,
help="path to the query image")
# construct the argument parse and parse the arguments
super_search_parser = subparsers.add_parser('supersearch')
super_search_parser.add_argument("-d", "--dataset",
required=True,
help="path to dataset of images")
super_search_parser.add_argument("-s", "--shelve",
required=True,
help="output shelve database")
super_search_parser.add_argument("-q", "--query",
required=True,
help="path to the query image")
super_search_parser.add_argument('-t', '--threshold',
type=int, default=12,
help='minimum match threshold')
super_search_parser.add_argument('--hash-name',
type=str, default='dhash',
choices=('dhash', 'phash', 'ahash'),
help='set hash function to use for fingerprints')
# construct the argument parse and parse the arguments
gather_parser = subparsers.add_parser('gather')
gather_parser.add_argument("-i", "--input",
required=True,
help="input directory of images")
gather_parser.add_argument("-o", "--output",
required=True,
help="output directory")
gather_parser.add_argument("-c", "--csv",
required=True,
help="path to CSV file for image counts")
args = vars(parser.parse_args())
if args['subparser_name'] == 'index':
index(args)
elif args['subparser_name'] == 'search':
search(args)
elif args['subparser_name'] == 'supersearch':
supersearch(args)
elif args['subparser_name'] == 'gather':
gather(args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment