Skip to content

Instantly share code, notes, and snippets.

@kkirsche
Last active August 12, 2022 12:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kkirsche/1791bc608c2cc0156d0de018ba41046d to your computer and use it in GitHub Desktop.
Save kkirsche/1791bc608c2cc0156d0de018ba41046d to your computer and use it in GitHub Desktop.
rsync algorithm in Python
#!/usr/bin/env python
# based on
# https://tylercipriani.com/blog/2017/07/09/the-rsync-algorithm-in-python/
from collections.abc import Generator
from hashlib import md5
from io import BufferedReader, TextIOWrapper
from logging import DEBUG, INFO, basicConfig, getLogger
from os import PathLike
from pathlib import Path
from typing import TypeAlias, Union, overload
from zlib import adler32
__all__ = [
"Chunks",
"Signature",
"_Openable",
"file",
]
BLOCK_SIZE = 4_096
ENABLE_DEBUG: bool = True
_Adler32Checksum: TypeAlias = int
_ChunkIndex: TypeAlias = int
_FileChunkBytes: TypeAlias = bytes
_FileChunkStr: TypeAlias = str
_FileChunk: TypeAlias = Union[_FileChunkBytes, _FileChunkStr]
_Md5Hash: TypeAlias = str
_Openable: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] | int
basicConfig()
logger = getLogger(__name__)
logger.setLevel(DEBUG) if ENABLE_DEBUG else logger.setLevel(INFO)
@overload
def iter_file(f: BufferedReader) -> Generator[_FileChunkBytes, None, None]:
...
@overload
def iter_file(f: TextIOWrapper) -> Generator[_FileChunkStr, None, None]:
...
def iter_file(f: BufferedReader | TextIOWrapper) -> Generator[_FileChunk, None, None]:
while True:
logger.debug("reading chunk")
chunk = f.read(BLOCK_SIZE)
if not chunk:
logger.debug("no chunk located")
break
yield chunk
def md5_chunk(chunk: _FileChunk) -> _Md5Hash:
"""Return md5 checksum of chunk."""
logger.debug("creating md5 hash of chunk")
if isinstance(chunk, _FileChunkStr):
chunk = chunk.encode()
return md5(chunk, usedforsecurity=False).hexdigest()
def adler32_chunk(chunk: _FileChunk) -> _Adler32Checksum:
"""Return adler32 chucksum for chunk."""
logger.debug("creating adler32 checksum of chunk")
if isinstance(chunk, _FileChunkStr):
chunk = chunk.encode()
return adler32(chunk)
class Signature:
md5: _Md5Hash
adler32: _Adler32Checksum
def __init__(self, chunk: _FileChunk) -> None:
"""Initialize a new signature."""
logger.debug("creating signature for chunk")
self.md5 = md5_chunk(chunk)
self.adler32 = adler32_chunk(chunk)
class Chunks:
"""A data structure that holds the rolling checksums for files."""
chunks: list[Signature]
# chunk_sigs[checksum][md5][chunk_len]
chunk_sigs: dict[_Adler32Checksum, dict[_Md5Hash, _ChunkIndex]]
def __init__(self) -> None:
"""Initialize a chunks registry."""
logger.debug("creating chunk signature registry")
self.chunks = []
self.chunk_sigs = {}
def append(self, sig: Signature) -> None:
logger.debug("appending signature")
self.chunks.append(sig)
self.chunk_sigs.setdefault(sig.adler32, {})
self.chunk_sigs[sig.adler32][sig.md5] = len(self.chunks) - 1
def get_chunk(self, chunk: _FileChunk) -> _ChunkIndex | None:
logger.debug("retrieving chunk of file")
try:
checksum = adler32_chunk(chunk)
md5_hash = md5_chunk(chunk)
located_chunk = self.chunk_sigs[checksum][md5_hash]
logger.debug("chunk located")
return located_chunk
except KeyError:
logger.debug("chunk missing")
return None
def __getitem__(self, idx: int) -> Signature:
logger.debug("getting signature at idx %d", idx)
return self.chunks[idx]
def __len__(self) -> int:
logger.debug("retrieving signature count")
return len(self.chunks)
def __iter__(self) -> Generator[Signature, None, None]:
logger.debug("iterating over signatures")
yield from self.chunks
def checksums_file(filename: _Openable) -> Chunks:
"""Returns object with checksums of file."""
logger.debug("creating signatures for %s", filename)
chunks = Chunks()
with open(filename, "r") as f:
for chunk in iter_file(f):
sig = Signature(chunk)
chunks.append(sig)
logger.debug("signatures created")
return chunks
def _get_chunk_list(
file_one: _Openable, file_two: _Openable
) -> list[_FileChunk | _ChunkIndex]:
"""The good stuff :)
1. Create a rolling checksum of file_two.
2. For each chunk in file one, determine if chunk is already in file two
a. If so:
i. return the index of that chunk.
ii. move the read head by the size of a chunk.
b. If not:
i. return the next byte.
ii. move the read head by 1 byte.
3. Start over at 2 until you're out of file to read.
"""
logger.debug("creating signatures for file two")
checksums = checksums_file(file_two)
logger.debug("signatures created")
chunks: list[_FileChunk | _ChunkIndex] = []
offset = 0
logger.debug("opening %s", file_one)
with open(file_one, "r") as f:
for chunk in iter_file(f):
located_chunk = checksums.get_chunk(chunk)
if located_chunk is None:
logger.debug("no signature located")
offset += 1
chunks.append(chunk[0])
f.seek(offset)
continue
logger.info("signature located")
offset += BLOCK_SIZE
chunks.append(located_chunk)
logger.debug("comparison completed, returning signatures")
return chunks
def file(file_one: _Openable, file_two: _Openable) -> str:
if isinstance(file_one, Path):
file_one = str(file_one.expanduser().resolve())
if isinstance(file_two, Path):
file_two = str(file_two.expanduser().resolve())
output = ""
logger.debug("opening file two")
with open(file_two, "r") as ft:
for chunk in _get_chunk_list(file_one, file_two):
if isinstance(chunk, int):
logger.debug("hunk found")
ft.seek(chunk * BLOCK_SIZE)
content = ft.read(BLOCK_SIZE)
output += content
continue
if isinstance(chunk, bytes):
chunk = chunk.decode()
output += chunk
return output
if __name__ == "__main__":
file_one = Path("./f1.txt")
file_two = Path("./f2.txt")
output = file(file_one, file_two)
print(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment