Created
February 23, 2017 01:58
-
-
Save realduke2000/4aecae3081211efb3381a9729600c6ba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# http://www.codeproject.com/Articles/43665/ExifLibrary-for-NET | |
# http://www.exiv2.org/tags.html | |
# http://www.awaresystems.be/imaging/tiff/tifftags.html | |
import os | |
import struct | |
import random | |
import datetime | |
import sys | |
import traceback | |
import getopt | |
import shutil | |
import logging | |
class exiftags: | |
datetime = 0x0132 | |
datetime_original = 0x9003 | |
datetime_digited = 0x9004 | |
exifpointer = 0x8769 | |
class Action: | |
NoAction = 0x00 | |
Copy = 0x01 | |
Move = 0x02 | |
class Option: | |
def __init__(self): | |
self.srcDir = "" | |
self.destDir = "" | |
self.action = Action.NoAction | |
class Jpg: | |
def __init__(self, file_path): | |
self.__file_path = file_path | |
self.__fo = None | |
self.__endian = '>' | |
self.__baseoffset = None | |
self.exif = {} | |
def __del__(self): | |
if self.__fo is not None: | |
self.__fo.close() | |
def __getfo(self): | |
if self.__fo is None: | |
self.__fo = open(self.__file_path, 'rb') | |
return self.__fo | |
def __isjpg(self): | |
arr = self.__getfo().read(2) | |
if (arr is None) or (len(arr) < 2): | |
return False | |
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xd8): | |
return True | |
return False | |
def __read_app0_section(self): | |
pos = self.__getfo().tell() | |
arr = self.__getfo().read(2) | |
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xe0): | |
arr = self.__getfo().read(2) | |
size = struct.unpack('>H', arr)[0] # big-endian | |
pos = self.__getfo().tell() | |
self.__getfo().seek(pos + size - 2, 0) # skip app0 section | |
else: | |
self.__getfo().seek(pos, 0) | |
def __read_app1_section(self): | |
pos = self.__getfo().tell() | |
arr = self.__getfo().read(2) | |
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xe1): | |
arr = self.__getfo().read(2) | |
size = struct.unpack('>H', arr)[0] | |
arr = self.__getfo().read(6) | |
# no exif | |
if arr != '\x45\x78\x69\x66\x00\x00': | |
logging.debug("NOT EXIF!") | |
return | |
# base position | |
self.__baseoffset = self.__getfo().tell() | |
# get little/bigdian | |
arr = self.__getfo().read(2) | |
if (ord(arr[0]) == 0x49) and (ord(arr[1]) == 0x49): | |
self.__endian = '<' | |
elif (ord(arr[0]) == 0x4d) and (ord(arr[1]) == 0x4d): | |
self.__endian = '>' | |
else: | |
logging.debug("Failed to get big-/little-endian") | |
raise IOError | |
# TIFF marker, should always be [0x002A] | |
self.__getfo().read(2) | |
arr = self.__getfo().read(4) | |
# Read 0th IFD | |
nextifd = struct.unpack(self.__endian + 'L', arr)[0] | |
if nextifd != 0: | |
exifpointer = {exiftags.exifpointer:None} | |
self.__getfo().seek(self.__baseoffset + nextifd, 0) | |
self.__read_IFD(exifpointer) | |
else: | |
logging.debug("Read 0th ifd failed...") | |
return | |
# Read EXIF IFD | |
if exifpointer[exiftags.exifpointer] is None: | |
logging.debug("Read EXIF IFD offset failed...") | |
return | |
nextifd = struct.unpack(self.__endian + 'L', exifpointer[exiftags.exifpointer])[0] | |
if nextifd != 0: | |
self.__getfo().seek(self.__baseoffset + nextifd, 0) | |
self.__read_IFD(self.exif) | |
else: | |
logging.debug("exif pointer is 0") | |
else: | |
self.__getfo().seek(pos, 0) | |
def __read_IFD(self,tags): | |
# get IFD field count | |
arr = self.__getfo().read(2) | |
fieldcount = struct.unpack(self.__endian + 'H', arr)[0] | |
# process fileds | |
for i in range(0, fieldcount): | |
self.__read_IFD_Field(tags) | |
def __read_IFD_Field(self,tags): | |
arr = self.__getfo().read(2) | |
tagid = struct.unpack(self.__endian + 'H', arr)[0] | |
arr = self.__getfo().read(2) | |
type = struct.unpack(self.__endian + 'H', arr)[0] | |
arr = self.__getfo().read(4) | |
count = struct.unpack(self.__endian + 'L', arr)[0] | |
# Byte length of field data | |
if type == 1: | |
n = count | |
elif (type == 2) or (type == 7): | |
n = count | |
elif (type == 3): | |
n = 2 * count | |
elif (type == 4) or (type == 9): | |
n = 4 * count | |
elif (type == 5) or (type == 10): | |
n = 8 * count | |
# Get value or offset | |
value = self.__getfo().read(4) | |
if tagid not in tags: | |
return | |
# offset | |
if n > 4: | |
pos = self.__getfo().tell() | |
value = struct.unpack(self.__endian + 'L', value)[0] | |
self.__getfo().seek(self.__baseoffset + value, 0) | |
value = self.__getfo().read(n) | |
self.__getfo().seek(pos, 0) | |
tags[tagid] = value | |
def getEXIF(self, tags): | |
try: | |
self.exif = tags | |
if not self.__isjpg(): | |
logging.debug("file " + self.__file_path + " is not jpg file") | |
return | |
self.__read_app0_section() | |
self.__read_app1_section() | |
finally: | |
self.__getfo().seek(0, 0) | |
def visitjpg(option, dirname, names): | |
destdir = option.destDir | |
for name in names: | |
if name.find('.jpg') < 0 and name.find('.JPG') < 0: | |
continue | |
tags = {exiftags.datetime_original:None} | |
try: | |
origpath = os.path.join(dirname, name) | |
j = Jpg(origpath) | |
j.getEXIF(tags) | |
if j.exif[exiftags.datetime_original] is None: | |
logging.debug("failed to get exif of: " + dirname + "/" + name) | |
continue | |
# the exif read from jpg has \0(NULL bytes) at the end of the string, trim it | |
strdt = j.exif[exiftags.datetime_original] | |
del j | |
while strdt[-1] == "\0": | |
strdt = strdt[0:-1] | |
dt = datetime.datetime.strptime(strdt, '%Y:%m:%d %H:%M:%S') | |
# Get date aggregate folder | |
dtdir = os.path.join(destdir, dt.date().isoformat()) | |
if not os.path.exists(dtdir): | |
os.mkdir(dtdir) | |
if not os.path.isdir(dtdir): | |
logging.debug("failed to initialize dir: " + dtdir) | |
continue | |
newpath = os.path.join(dtdir, dt.date().isoformat() +"_" + dt.time().isoformat().replace(":","-") + ".jpg") | |
while os.path.exists(newpath): | |
newpath = os.path.splitext(newpath)[0] + "_" + str(random.randint(0,100)) + os.path.splitext(newpath)[1] | |
#os.rename(origpath, newpath) # use shutil.move if src and dest is on difference file system | |
if option.action == Action.Copy: | |
logging.debug("copying " + origpath + " to " + newpath) | |
shutil.copyfile(origpath, newpath) | |
elif option.action == Action.Move: | |
logging.debug("moving " + origpath + " to " + newpath) | |
shutil.move(origpath, newpath) | |
else: | |
logging.debug("error action on: " + origpath + ", newpath: " + newpath) | |
except IOError as e: | |
#logging.debug("failed to rename: " + dirname + "/" + name + ' due to: ' + str(e)) | |
#traceback.print_exc(file=sys.stdout) | |
logging.debug("IOError: failed to rename: " + dirname + "/" + name) | |
#traceback.print_exc(file=sys.stdout) | |
except: | |
logging.debug("Filed while rename: " + dirname + "/" + name) | |
#traceback.print_exc(file=sys.stdout) | |
raise | |
def usage_and_exit(): | |
print(__file__ + " Usage:") | |
print("python " + __file__ + "srcdir [destdir] [-h|--help] [-m|--move]") | |
print("\t-m|--move move files from src to dest, by default its action is copy") | |
print("\tif two dir are specified, 1st dir is src dir, 2nd dir is dest dir") | |
print("\tif only one dir is specified, the 1st dir is src dir, the current dir is dest dir") | |
exit(2) | |
def main(argv): | |
try: | |
option = Option() | |
option.action = Action.Copy | |
loglevel = logging.INFO | |
optlist, args = getopt.getopt(argv, 'hm', ['help','move','debug']) | |
for o, a in optlist: | |
if o in ('-h', '--help'): | |
usage_and_exit(); | |
elif o in ('-m', '--move'): | |
option.action = Action.Move | |
elif o == '--debug': | |
loglevel = logging.DEBUG | |
logging.basicConfig(level=loglevel) | |
# find src, dest dir | |
srcdir = "" | |
destdir = "" | |
if len(args) == 1: | |
srcdir = args[0] | |
destdir = os.path.dirname(os.path.abspath(__file__)) | |
elif len(args) == 2: | |
srcdir = args[0] | |
destdir = args[1] | |
else: | |
usage_and_exit() | |
if (not os.path.exists(srcdir)) or (not os.path.isdir(srcdir)): | |
logging.debug("src: " + str(srcdir) + " is not a directory") | |
usage_and_exit() | |
start = datetime.datetime.now() | |
if not os.path.exists(destdir): | |
os.mkdir(destdir) | |
else: | |
if (not os.path.isdir(destdir)): | |
logging.debug("dest: " + str(destdir) + " is not a directory") | |
usage_and_exit() | |
option.srcDir = srcdir | |
option.destDir = destdir | |
os.path.walk(srcdir, visitjpg, option) | |
logging.info("started when: %s, finished when: %s, cost: %s" % (str(start), str(datetime.datetime.now()), str(datetime.datetime.now()-start))) | |
except getopt.GetoptError: | |
logging.debug("getopt error") | |
usage_and_exit() | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment