Skip to content

Instantly share code, notes, and snippets.

@realduke2000
Created February 23, 2017 01:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save realduke2000/4aecae3081211efb3381a9729600c6ba to your computer and use it in GitHub Desktop.
Save realduke2000/4aecae3081211efb3381a9729600c6ba to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# http://www.codeproject.com/Articles/43665/ExifLibrary-for-NET
# http://www.exiv2.org/tags.html
# http://www.awaresystems.be/imaging/tiff/tifftags.html
import os
import struct
import random
import datetime
import sys
import traceback
import getopt
import shutil
import logging
class exiftags:
datetime = 0x0132
datetime_original = 0x9003
datetime_digited = 0x9004
exifpointer = 0x8769
class Action:
NoAction = 0x00
Copy = 0x01
Move = 0x02
class Option:
def __init__(self):
self.srcDir = ""
self.destDir = ""
self.action = Action.NoAction
class Jpg:
def __init__(self, file_path):
self.__file_path = file_path
self.__fo = None
self.__endian = '>'
self.__baseoffset = None
self.exif = {}
def __del__(self):
if self.__fo is not None:
self.__fo.close()
def __getfo(self):
if self.__fo is None:
self.__fo = open(self.__file_path, 'rb')
return self.__fo
def __isjpg(self):
arr = self.__getfo().read(2)
if (arr is None) or (len(arr) < 2):
return False
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xd8):
return True
return False
def __read_app0_section(self):
pos = self.__getfo().tell()
arr = self.__getfo().read(2)
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xe0):
arr = self.__getfo().read(2)
size = struct.unpack('>H', arr)[0] # big-endian
pos = self.__getfo().tell()
self.__getfo().seek(pos + size - 2, 0) # skip app0 section
else:
self.__getfo().seek(pos, 0)
def __read_app1_section(self):
pos = self.__getfo().tell()
arr = self.__getfo().read(2)
if (ord(arr[0]) == 0xff) and (ord(arr[1]) == 0xe1):
arr = self.__getfo().read(2)
size = struct.unpack('>H', arr)[0]
arr = self.__getfo().read(6)
# no exif
if arr != '\x45\x78\x69\x66\x00\x00':
logging.debug("NOT EXIF!")
return
# base position
self.__baseoffset = self.__getfo().tell()
# get little/bigdian
arr = self.__getfo().read(2)
if (ord(arr[0]) == 0x49) and (ord(arr[1]) == 0x49):
self.__endian = '<'
elif (ord(arr[0]) == 0x4d) and (ord(arr[1]) == 0x4d):
self.__endian = '>'
else:
logging.debug("Failed to get big-/little-endian")
raise IOError
# TIFF marker, should always be [0x002A]
self.__getfo().read(2)
arr = self.__getfo().read(4)
# Read 0th IFD
nextifd = struct.unpack(self.__endian + 'L', arr)[0]
if nextifd != 0:
exifpointer = {exiftags.exifpointer:None}
self.__getfo().seek(self.__baseoffset + nextifd, 0)
self.__read_IFD(exifpointer)
else:
logging.debug("Read 0th ifd failed...")
return
# Read EXIF IFD
if exifpointer[exiftags.exifpointer] is None:
logging.debug("Read EXIF IFD offset failed...")
return
nextifd = struct.unpack(self.__endian + 'L', exifpointer[exiftags.exifpointer])[0]
if nextifd != 0:
self.__getfo().seek(self.__baseoffset + nextifd, 0)
self.__read_IFD(self.exif)
else:
logging.debug("exif pointer is 0")
else:
self.__getfo().seek(pos, 0)
def __read_IFD(self,tags):
# get IFD field count
arr = self.__getfo().read(2)
fieldcount = struct.unpack(self.__endian + 'H', arr)[0]
# process fileds
for i in range(0, fieldcount):
self.__read_IFD_Field(tags)
def __read_IFD_Field(self,tags):
arr = self.__getfo().read(2)
tagid = struct.unpack(self.__endian + 'H', arr)[0]
arr = self.__getfo().read(2)
type = struct.unpack(self.__endian + 'H', arr)[0]
arr = self.__getfo().read(4)
count = struct.unpack(self.__endian + 'L', arr)[0]
# Byte length of field data
if type == 1:
n = count
elif (type == 2) or (type == 7):
n = count
elif (type == 3):
n = 2 * count
elif (type == 4) or (type == 9):
n = 4 * count
elif (type == 5) or (type == 10):
n = 8 * count
# Get value or offset
value = self.__getfo().read(4)
if tagid not in tags:
return
# offset
if n > 4:
pos = self.__getfo().tell()
value = struct.unpack(self.__endian + 'L', value)[0]
self.__getfo().seek(self.__baseoffset + value, 0)
value = self.__getfo().read(n)
self.__getfo().seek(pos, 0)
tags[tagid] = value
def getEXIF(self, tags):
try:
self.exif = tags
if not self.__isjpg():
logging.debug("file " + self.__file_path + " is not jpg file")
return
self.__read_app0_section()
self.__read_app1_section()
finally:
self.__getfo().seek(0, 0)
def visitjpg(option, dirname, names):
destdir = option.destDir
for name in names:
if name.find('.jpg') < 0 and name.find('.JPG') < 0:
continue
tags = {exiftags.datetime_original:None}
try:
origpath = os.path.join(dirname, name)
j = Jpg(origpath)
j.getEXIF(tags)
if j.exif[exiftags.datetime_original] is None:
logging.debug("failed to get exif of: " + dirname + "/" + name)
continue
# the exif read from jpg has \0(NULL bytes) at the end of the string, trim it
strdt = j.exif[exiftags.datetime_original]
del j
while strdt[-1] == "\0":
strdt = strdt[0:-1]
dt = datetime.datetime.strptime(strdt, '%Y:%m:%d %H:%M:%S')
# Get date aggregate folder
dtdir = os.path.join(destdir, dt.date().isoformat())
if not os.path.exists(dtdir):
os.mkdir(dtdir)
if not os.path.isdir(dtdir):
logging.debug("failed to initialize dir: " + dtdir)
continue
newpath = os.path.join(dtdir, dt.date().isoformat() +"_" + dt.time().isoformat().replace(":","-") + ".jpg")
while os.path.exists(newpath):
newpath = os.path.splitext(newpath)[0] + "_" + str(random.randint(0,100)) + os.path.splitext(newpath)[1]
#os.rename(origpath, newpath) # use shutil.move if src and dest is on difference file system
if option.action == Action.Copy:
logging.debug("copying " + origpath + " to " + newpath)
shutil.copyfile(origpath, newpath)
elif option.action == Action.Move:
logging.debug("moving " + origpath + " to " + newpath)
shutil.move(origpath, newpath)
else:
logging.debug("error action on: " + origpath + ", newpath: " + newpath)
except IOError as e:
#logging.debug("failed to rename: " + dirname + "/" + name + ' due to: ' + str(e))
#traceback.print_exc(file=sys.stdout)
logging.debug("IOError: failed to rename: " + dirname + "/" + name)
#traceback.print_exc(file=sys.stdout)
except:
logging.debug("Filed while rename: " + dirname + "/" + name)
#traceback.print_exc(file=sys.stdout)
raise
def usage_and_exit():
print(__file__ + " Usage:")
print("python " + __file__ + "srcdir [destdir] [-h|--help] [-m|--move]")
print("\t-m|--move move files from src to dest, by default its action is copy")
print("\tif two dir are specified, 1st dir is src dir, 2nd dir is dest dir")
print("\tif only one dir is specified, the 1st dir is src dir, the current dir is dest dir")
exit(2)
def main(argv):
try:
option = Option()
option.action = Action.Copy
loglevel = logging.INFO
optlist, args = getopt.getopt(argv, 'hm', ['help','move','debug'])
for o, a in optlist:
if o in ('-h', '--help'):
usage_and_exit();
elif o in ('-m', '--move'):
option.action = Action.Move
elif o == '--debug':
loglevel = logging.DEBUG
logging.basicConfig(level=loglevel)
# find src, dest dir
srcdir = ""
destdir = ""
if len(args) == 1:
srcdir = args[0]
destdir = os.path.dirname(os.path.abspath(__file__))
elif len(args) == 2:
srcdir = args[0]
destdir = args[1]
else:
usage_and_exit()
if (not os.path.exists(srcdir)) or (not os.path.isdir(srcdir)):
logging.debug("src: " + str(srcdir) + " is not a directory")
usage_and_exit()
start = datetime.datetime.now()
if not os.path.exists(destdir):
os.mkdir(destdir)
else:
if (not os.path.isdir(destdir)):
logging.debug("dest: " + str(destdir) + " is not a directory")
usage_and_exit()
option.srcDir = srcdir
option.destDir = destdir
os.path.walk(srcdir, visitjpg, option)
logging.info("started when: %s, finished when: %s, cost: %s" % (str(start), str(datetime.datetime.now()), str(datetime.datetime.now()-start)))
except getopt.GetoptError:
logging.debug("getopt error")
usage_and_exit()
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment