Created
January 5, 2014 05:31
-
-
Save nsfmc/8264799 to your computer and use it in GitHub Desktop.
the real trick is probably to just fix liten or to write my own dedupe program. this is not bad, though.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
doop - a script for moving duplicates from a camera uploads folder from a report | |
generated by [liten](https://code.google.com/p/liten/) | |
"DOOP? What's that?" -- philip j. fry | |
(c) 2014 marcos.a.ojeda <marcos@generic.cx> | |
MIT licensed (lol) | |
""" | |
import argparse | |
import csv | |
import hashlib | |
import itertools | |
import os | |
import os.path | |
import re | |
import unittest | |
def pick_dupe(path1, path2): | |
dupe_re = re.compile(r'-(\d+)$') | |
p1 = os.path.splitext(path1)[0] | |
p2 = os.path.splitext(path2)[0] | |
m1, m2 = dupe_re.search(p1), dupe_re.search(p2) | |
if m1 or m2: | |
if m1 and m2: | |
# find biggest match | |
if m1.group(1) > m2.group(1): | |
return path1 | |
else: | |
return path2 | |
else: | |
if m1: | |
return path1 | |
else: | |
return path2 | |
return False | |
def dedupe(dest_dir, report_path): | |
"""deduplicates files in a liten report""" | |
with open(report_path) as f: | |
data = csv.DictReader(f) | |
# since liten reports are like | |
# [row1, row2, header, row4, row5, header, ... ] | |
# this returns | |
# [[row1, row2], [row4, row5], ...] | |
filtered = itertools.compress(data, itertools.cycle([1,1,0])) | |
paths = list(itertools.imap(lambda x: x[' Path'].strip(), | |
filtered)) | |
pairs = zip(paths[::2], paths[1::2]) | |
for a, b in pairs: | |
if not os.path.exists(a) or not os.path.exists(b): | |
continue | |
# confirm the files are dupes still | |
md5 = lambda x: hashlib.md5(open(x, 'rb').read()).hexdigest()[:16] | |
if md5(a) == md5(b): | |
dupe = pick_dupe(a, b) | |
print "moving %s to %s" % (os.path.basename(dupe), dest_dir) | |
os.rename(dupe, os.path.join(dest_dir, os.path.basename(dupe))) | |
class PickTest(unittest.TestCase): | |
# ridiculous, i know, but i get easily confused when doing these things | |
def setUp(self): | |
self.a = "foo.jpg" | |
self.b = "foo.1.jpg" | |
self.c = "foo.1-1.jpg" | |
self.d = "foo.1-2.jpg" | |
def test_no_dupe(self): | |
self.assertFalse(pick_dupe(self.a, self.b)) | |
self.assertFalse(pick_dupe(self.b, self.a)) | |
def test_dupe(self): | |
self.assertEqual(self.c, pick_dupe(self.b, self.c)) | |
self.assertEqual(self.c, pick_dupe(self.c, self.b)) | |
def test_two_dupes(self): | |
self.assertEqual(self.d, pick_dupe(self.c, self.d)) | |
self.assertEqual(self.d, pick_dupe(self.d, self.c)) | |
def main(): | |
parser = argparse.ArgumentParser(prog='doop', | |
description='Deduplicate a dropbox camera uploads folder') | |
parser.add_argument('report_path', type=str, | |
help='The full path to the liten report') | |
parser.add_argument('destination', type=str, | |
help='A destination directory for duplicate files') | |
args = parser.parse_args() | |
dedupe(args.destination, args.report_path) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment