Skip to content

Instantly share code, notes, and snippets.

@jbylund
Created January 13, 2020 00:30
Show Gist options
  • Save jbylund/173ac1cfe881a550ea0917f797ed692c to your computer and use it in GitHub Desktop.
Save jbylund/173ac1cfe881a550ea0917f797ed692c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
put files to copy on
=> pipe 1 (contains files to copy)
copy from camera into tempdir on same filesystem as the import dir (one at a time)
=> pipe 2 (contains files on local fs)
move into correct locations and read metadata (in parallel)
=> pipe 3 (contains files & metadata)
write into sqlite database (one at a time)
=> pipe 4 finished
"""
import argparse
import datetime
import multiprocessing
import os
import shutil
import sqlite3
import tempfile
import time
import exifread
poison_pill = "POISON_PILL"
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("source")
return vars(parser.parse_args())
def get_exif(filename):
with open(filename, 'rb') as fh:
return exifread.process_file(fh)
class Worker(multiprocessing.Process):
def __init__(self, in_queue, out_queue):
super().__init__()
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
next_job = self.in_queue.get()
if next_job == poison_pill: # equals instead of is
print("worker exiting...")
return
processed = self.process_job(next_job)
self.out_queue.put(processed)
def process_job(self, job):
raise NotImplementedError("xxx")
class CopyWorker(Worker):
def process_job(self, job):
# copy from where it lies to somewhere on the same fs as the library
pictures_dir = os.path.expanduser("~/Pictures")
os.makedirs(pictures_dir, exist_ok=True)
newpath = tempfile.NamedTemporaryFile(dir=pictures_dir, delete=False)
shutil.copyfile(job["original_file"], newpath.name)
job["staging_file"] = newpath.name
return job
class ProcessWorker(Worker):
def process_job(self, job):
staging_filename = job["staging_file"]
metadata = get_exif(staging_filename) # get the metadata out
# add parsed timestamp into the metadata
parsed_timestamp = metadata["parsed_timestamp"] = datetime.datetime.strptime(
metadata["Image DateTime"].values,
"%Y:%m:%d %H:%M:%S"
)
# move the file into its final resting place
outdir = os.path.join(
os.path.expanduser("~/Pictures"),
parsed_timestamp.strftime("%Y/%m/%d")
)
os.makedirs(outdir, exist_ok=True)
fullpath = os.path.join(outdir, os.path.basename(job["original_file"]))
shutil.move(
job["staging_file"],
fullpath
)
job["metadata"] = metadata
job["final_path"] = fullpath
return job
class DBWorker(Worker):
def __init__(self, inqueue, outqueue):
super().__init__(inqueue, outqueue)
self.files_processed = 0
self.start_time = time.time()
def process_job(self, job):
self.files_processed += 1
job.pop("metadata", None)
print(
"dbworker: {} files in {:.2f}s ({:.2f} files/sec): {}".format(
self.files_processed,
time.time() - self.start_time,
self.files_processed / (time.time() - self.start_time),
job
)
)
class Importer(object):
def __init__(self):
self.to_copy = multiprocessing.Queue()
self.to_process = multiprocessing.Queue()
self.to_record = multiprocessing.Queue()
self.done = multiprocessing.Queue()
def import_photos(self, source):
print("importing photos from {}".format(source))
copy_worker = CopyWorker(self.to_copy, self.to_process)
process_worker = ProcessWorker(self.to_process, self.to_record)
db_worker = DBWorker(self.to_record, self.done)
self.enqueue_work(source)
# start everything up
copy_worker.start()
process_workers = []
for _ in range(multiprocessing.cpu_count()):
iprocess_worker = ProcessWorker(self.to_process, self.to_record)
process_workers.append(iprocess_worker)
iprocess_worker.start()
db_worker.start()
self.to_copy.put(poison_pill)
copy_worker.join()
for iprocess_worker in process_workers:
self.to_process.put(poison_pill)
for i, iprocess_worker in enumerate(process_workers):
print(i, iprocess_worker.join())
self.to_record.put(poison_pill)
db_worker.join()
def enqueue_work(self, source):
for root, dirs, files in os.walk(source):
for ifile in files:
fullpath = os.path.join(root, ifile)
self.to_copy.put({"original_file": fullpath})
def main():
args = get_args()
importer = Importer()
importer.import_photos(args["source"])
if "__main__" == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment