Last active
December 8, 2023 13:20
-
-
Save alpden550/06b7109ac4fab56887926972eb6c69fe to your computer and use it in GitHub Desktop.
Sync nested
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import shutil | |
from collections.abc import Iterator | |
from pathlib import Path | |
BUF_SIZE = 65536 | |
type H = dict[str, str] | |
type A = Iterator[tuple[str, str, str | None]] | |
def fetch_file_hash(filename: str) -> str: | |
digest = hashlib.sha256() | |
with Path(filename).open("rb") as file_obj: | |
while buffer := file_obj.read(BUF_SIZE): | |
digest.update(buffer) | |
return digest.hexdigest() | |
def create_paths_and_hashes(path: str) -> H: | |
hashes = {} | |
for file in Path(path).rglob("*"): | |
if file.is_file() and not file.name.startswith("."): | |
hashes[str(file)] = fetch_file_hash(str(file.resolve())) | |
return hashes | |
def determine_actions(source_hashes: H, destination_hashes: H, source: str, destination: str) -> A: | |
for filename in source_hashes: | |
if filename not in destination_hashes: | |
source_path = Path(filename).relative_to(source) | |
yield "copy", filename, Path(destination).joinpath(source_path) | |
for filename in destination_hashes: | |
dest_filepath = Path(source).joinpath(Path(filename).relative_to(destination)) | |
if str(dest_filepath) not in source_hashes: | |
yield "delete", filename, None | |
def sync(source: str, destination: str) -> None: | |
source_hashes = create_paths_and_hashes(source) | |
destination_hashes = create_paths_and_hashes(destination) | |
actions = determine_actions(source_hashes, destination_hashes, source, destination) | |
for action, *path in actions: | |
match action: | |
case "copy": | |
try: | |
shutil.copyfile(*path) | |
except FileNotFoundError: | |
dest_dirs = Path(path[-1]).parent | |
Path(dest_dirs).mkdir(parents=True, exist_ok=True) | |
shutil.copyfile(*path) | |
case "delete": | |
Path(path[0]).unlink() | |
def main(): | |
sync("new", "dest") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import tempfile | |
from pathlib import Path | |
import pytest | |
from sync import sync | |
@pytest.fixture() | |
def source(): | |
source = tempfile.mkdtemp(prefix="source") | |
yield source | |
shutil.rmtree(source) | |
@pytest.fixture() | |
def destination(): | |
dest = tempfile.mkdtemp(prefix="dest") | |
yield dest | |
shutil.rmtree(dest) | |
def test_copy_existing_file_to_destination(source, destination): | |
Path(source).joinpath("file1").write_text("content") | |
sync(source, destination) | |
expected = Path(destination).joinpath("file1") | |
assert expected.exists() | |
assert expected.read_text() == "content" | |
def test_copy_existing_nested_file_to_destination(source, destination): | |
Path(source).joinpath("dir1").mkdir(parents=True, exist_ok=True) | |
Path(f"{source}/dir1").joinpath("file1").write_text("content") | |
sync(source, destination) | |
expected = Path(f"{source}/dir1").joinpath("file1") | |
assert expected.exists() | |
assert expected.read_text() == "content" | |
def test_delete_file_from_destination(source, destination): | |
Path(destination).joinpath("file1").write_text("content") | |
sync(source, destination) | |
expected = Path(destination).joinpath("file1") | |
assert not expected.exists() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment