This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_when_a_file_exists_in_the_source_but_not_the_destination(): | |
source = {"sha1": "my-file" } | |
dest = {} | |
filesystem = FakeFileSystem() | |
reader = {"/source": source, "/dest": dest} | |
synchronise_dirs(reader.pop, filesystem, "/source", "/dest") | |
assert filesystem == [("COPY", "/source/my-file", "/dest/my-file")] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FakeFileSystem(list): | |
def copy(self, src, dest): | |
self.append(('COPY', src, dest)) | |
def move(self, src, dest): | |
self.append(('MOVE', src, dest)) | |
def delete(self, dest): | |
self.append(('DELETE', src, dest)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def synchronise_dirs(reader, filesystem, source_root, dest_root): | |
source_hashes = reader(source_root) | |
dest_hashes = reader(dest_root) | |
for sha, filename in src_hashes.items(): | |
if sha not in dest_hashes: | |
sourcepath = source_root / filename | |
destpath = dest_root / filename | |
filesystem.copy(destpath, sourcepath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_when_a_file_exists_in_the_source_but_not_the_destination(): | |
src_hashes = {'hash1': 'fn1'} | |
dst_hashes = {} | |
actions = determine_actions(src_hashes, dst_hashes, Path('/src'), Path('/dst')) | |
assert list(actions) == [('copy', Path('/src/fn1'), Path('/dst/fn1'))] | |
… | |
def test_when_a_file_has_been_renamed_in_the_source(): | |
src_hashes = {'hash1': 'fn1'} | |
dst_hashes = {'hash1': 'fn2'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def determine_actions(src_hashes, dst_hashes, src_folder, dst_folder): | |
for sha, filename in src_hashes.items(): | |
if sha not in dst_hashes: | |
sourcepath = Path(src_folder) / filename | |
destpath = Path(dst_folder) / filename | |
yield 'copy', sourcepath, destpath | |
elif dst_hashes[sha] != filename: | |
olddestpath = Path(dst_folder) / dst_hashes[sha] | |
newdestpath = Path(dst_folder) / filename |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_paths_and_hashes(root): | |
hashes = {} | |
for folder, _, files in os.walk(root): | |
for fn in files: | |
hashes[hash_file(Path(folder) / fn)] = fn | |
return hashes |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def sync(source, dest): | |
# imperative shell step 1, gather inputs | |
source_hashes = read_paths_and_hashes(source) | |
dest_hashes = read_paths_and_hashes(dest) | |
# step 2: call functional core | |
actions = determine_actions(source_hashes, dest_hashes, source, dest) | |
# imperative shell step 3, apply outputs | |
for action, *paths in actions: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_when_a_file_exists_in_the_source_but_not_the_destination(): | |
src_hashes = {'hash1': 'fn1'} | |
dst_hashes = {} | |
expected_actions = [('COPY', '/src/fn1', '/dst/fn1')] | |
… | |
def test_when_a_file_has_been_renamed_in_the_source(): | |
src_hashes = {'hash1': 'fn1'} | |
dst_hashes = {'hash1': 'fn2'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
("COPY", "sourcepath", "destpath"), | |
("MOVE", "old", "new"), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source_files = {'hash1': 'path1', 'hash2': 'path2'} | |
dest_files = {'hash1': 'path1', 'hash2': 'pathX'} |
NewerOlder