Created
January 13, 2020 00:30
-
-
Save jbylund/173ac1cfe881a550ea0917f797ed692c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
put files to copy on | |
=> pipe 1 (contains files to copy) | |
copy from camera into tempdir on same filesystem as the import dir (one at a time) | |
=> pipe 2 (contains files on local fs) | |
move into correct locations and read metadata (in parallel) | |
=> pipe 3 (contains files & metadata) | |
write into sqlite database (one at a time) | |
=> pipe 4 finished | |
""" | |
import argparse | |
import datetime | |
import multiprocessing | |
import os | |
import shutil | |
import sqlite3 | |
import tempfile | |
import time | |
import exifread | |
poison_pill = "POISON_PILL" | |
def get_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("source") | |
return vars(parser.parse_args()) | |
def get_exif(filename): | |
with open(filename, 'rb') as fh: | |
return exifread.process_file(fh) | |
class Worker(multiprocessing.Process): | |
def __init__(self, in_queue, out_queue): | |
super().__init__() | |
self.in_queue = in_queue | |
self.out_queue = out_queue | |
def run(self): | |
while True: | |
next_job = self.in_queue.get() | |
if next_job == poison_pill: # equals instead of is | |
print("worker exiting...") | |
return | |
processed = self.process_job(next_job) | |
self.out_queue.put(processed) | |
def process_job(self, job): | |
raise NotImplementedError("xxx") | |
class CopyWorker(Worker): | |
def process_job(self, job): | |
# copy from where it lies to somewhere on the same fs as the library | |
pictures_dir = os.path.expanduser("~/Pictures") | |
os.makedirs(pictures_dir, exist_ok=True) | |
newpath = tempfile.NamedTemporaryFile(dir=pictures_dir, delete=False) | |
shutil.copyfile(job["original_file"], newpath.name) | |
job["staging_file"] = newpath.name | |
return job | |
class ProcessWorker(Worker): | |
def process_job(self, job): | |
staging_filename = job["staging_file"] | |
metadata = get_exif(staging_filename) # get the metadata out | |
# add parsed timestamp into the metadata | |
parsed_timestamp = metadata["parsed_timestamp"] = datetime.datetime.strptime( | |
metadata["Image DateTime"].values, | |
"%Y:%m:%d %H:%M:%S" | |
) | |
# move the file into its final resting place | |
outdir = os.path.join( | |
os.path.expanduser("~/Pictures"), | |
parsed_timestamp.strftime("%Y/%m/%d") | |
) | |
os.makedirs(outdir, exist_ok=True) | |
fullpath = os.path.join(outdir, os.path.basename(job["original_file"])) | |
shutil.move( | |
job["staging_file"], | |
fullpath | |
) | |
job["metadata"] = metadata | |
job["final_path"] = fullpath | |
return job | |
class DBWorker(Worker): | |
def __init__(self, inqueue, outqueue): | |
super().__init__(inqueue, outqueue) | |
self.files_processed = 0 | |
self.start_time = time.time() | |
def process_job(self, job): | |
self.files_processed += 1 | |
job.pop("metadata", None) | |
print( | |
"dbworker: {} files in {:.2f}s ({:.2f} files/sec): {}".format( | |
self.files_processed, | |
time.time() - self.start_time, | |
self.files_processed / (time.time() - self.start_time), | |
job | |
) | |
) | |
class Importer(object): | |
def __init__(self): | |
self.to_copy = multiprocessing.Queue() | |
self.to_process = multiprocessing.Queue() | |
self.to_record = multiprocessing.Queue() | |
self.done = multiprocessing.Queue() | |
def import_photos(self, source): | |
print("importing photos from {}".format(source)) | |
copy_worker = CopyWorker(self.to_copy, self.to_process) | |
process_worker = ProcessWorker(self.to_process, self.to_record) | |
db_worker = DBWorker(self.to_record, self.done) | |
self.enqueue_work(source) | |
# start everything up | |
copy_worker.start() | |
process_workers = [] | |
for _ in range(multiprocessing.cpu_count()): | |
iprocess_worker = ProcessWorker(self.to_process, self.to_record) | |
process_workers.append(iprocess_worker) | |
iprocess_worker.start() | |
db_worker.start() | |
self.to_copy.put(poison_pill) | |
copy_worker.join() | |
for iprocess_worker in process_workers: | |
self.to_process.put(poison_pill) | |
for i, iprocess_worker in enumerate(process_workers): | |
print(i, iprocess_worker.join()) | |
self.to_record.put(poison_pill) | |
db_worker.join() | |
def enqueue_work(self, source): | |
for root, dirs, files in os.walk(source): | |
for ifile in files: | |
fullpath = os.path.join(root, ifile) | |
self.to_copy.put({"original_file": fullpath}) | |
def main(): | |
args = get_args() | |
importer = Importer() | |
importer.import_photos(args["source"]) | |
if "__main__" == __name__: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment