Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Created May 12, 2022 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/4574d96d4ccb8398977e8cf88ba363f7 to your computer and use it in GitHub Desktop.
Save thehesiod/4574d96d4ccb8398977e8cf88ba363f7 to your computer and use it in GitHub Desktop.
async file syncer
import asyncio
import os
from pathlib import Path
from typing import Dict, Optional
import logging
import shutil
from functools import partial
import hashlib
import dataclasses
from asyncpool import AsyncPool
def md5_hash(path: Path, size: int = 2**10):
# pts = time.process_time()
# ats = time.time()
m = hashlib.md5()
with path.open('rb') as f:
while b := f.read(size):
m.update(b)
return m.hexdigest()
# print("{0:.3f} s".format(time.process_time() - pts))
# print("{0:.3f} s".format(time.time() - ats))
@dataclasses.dataclass
class PathInfo:
path: Path
lstat: os.stat_result
hash: Optional[str] = None
async def main():
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
source_folder = Path('\\\\192.168.1.79@8081\\DavWWWRoot\DCIM\Camera')
dest_folder = Path('E:\\Pictures\\Phones\\Dayana')
dest_files: Dict[str, PathInfo] = dict()
for idx, dst_path in enumerate(dest_folder.glob('*')):
if idx % 100 == 0:
print(f"Gather Dest folder file {idx}")
if dst_path.name.startswith('.'):
continue
assert dst_path.name not in dest_files
dest_files[dst_path.name] = PathInfo(dst_path, dst_path.lstat())
async def _process_path(src_path: Path):
dst_path = dest_folder / src_path.name
if dst_file_info := dest_files.get(src_path.name):
src_lstat = src_path.lstat()
if src_lstat.st_size == dst_file_info.lstat.st_size:
src_md5, dst_md5 = await asyncio.gather(
asyncio.to_thread(md5_hash, src_path),
asyncio.to_thread(md5_hash, dst_path)
)
if src_md5 == dst_md5:
logger.info(f'deleting {src_path} already exists at {dst_path}')
await asyncio.to_thread(src_path.unlink)
return
# different, what do we want to do
assert False
else:
logger.info(f'moving {src_path} to {dst_path}')
await asyncio.to_thread(shutil.move, src_path, dst_path)
async with AsyncPool(None, 7, 'file-workpool', logger, _process_path) as wp:
for idx, src_path in enumerate(source_folder.glob('*')):
if idx % 100 == 0:
print(f"Processing folder {idx}")
await wp.push(src_path)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment