Skip to content

Instantly share code, notes, and snippets.

@RhetTbull
Last active February 20, 2024 07:53
Show Gist options
  • Save RhetTbull/4510d5f9912b9c703d34dac4c1afc16a to your computer and use it in GitHub Desktop.
Save RhetTbull/4510d5f9912b9c703d34dac4c1afc16a to your computer and use it in GitHub Desktop.
Python method to copy a file with a callback (e.g. to show a progress bar). Includes example of copying with both click.progressbar and tqdm.
""" Copy a file with callback (E.g. update a progress bar) """
# based on flutefreak7's answer at StackOverflow
# https://stackoverflow.com/questions/29967487/get-progress-back-from-shutil-file-copy-thread/48450305#48450305
# License: MIT License
import os
import pathlib
import shutil
# how many bytes to read at once?
# shutil.copy uses 1024 * 1024 if _WINDOWS else 64 * 1024
# however, in my testing on MacOS with SSD, I've found a much larger buffer is faster
BUFFER_SIZE = 4096 * 1024
class SameFileError(OSError):
"""Raised when source and destination are the same file."""
class SpecialFileError(OSError):
"""Raised when trying to do a kind of operation (e.g. copying) which is
not supported on a special file (e.g. a named pipe)"""
def copy_with_callback(
src, dest, callback=None, follow_symlinks=True, buffer_size=BUFFER_SIZE
):
""" Copy file with a callback.
callback, if provided, must be a callable and will be
called after ever buffer_size bytes are copied.
Args:
src: source file, must exist
dest: destination path; if an existing directory,
file will be copied to the directory;
if it is not a directory, assumed to be destination filename
callback: callable to call after every buffer_size bytes are copied
callback will called as callback(bytes_copied since last callback, total bytes copied, total bytes in source file)
follow_symlinks: bool; if True, follows symlinks
buffer_size: how many bytes to copy before each call to the callback, default = 4Mb
Returns:
Full path to destination file
Raises:
FileNotFoundError if src doesn't exist
SameFileError if src and dest are the same file
SpecialFileError if src or dest are special files (e.g. named pipe)
Note: Does not copy extended attributes, resource forks or other metadata.
"""
srcfile = pathlib.Path(src)
destpath = pathlib.Path(dest)
if not srcfile.is_file():
raise FileNotFoundError(f"src file `{src}` doesn't exist")
destfile = destpath / srcfile.name if destpath.is_dir() else destpath
if destfile.exists() and srcfile.samefile(destfile):
raise SameFileError(
f"source file `{src}` and destinaton file `{dest}` are the same file."
)
# check for special files, lifted from shutil.copy source
for fname in [srcfile, destfile]:
try:
st = os.stat(str(fname))
except OSError:
# File most likely does not exist
pass
else:
if shutil.stat.S_ISFIFO(st.st_mode):
raise SpecialFileError(f"`{fname}` is a named pipe")
if callback is not None and not callable(callback):
raise ValueError("callback is not callable")
if not follow_symlinks and srcfile.is_symlink():
if destfile.exists():
os.unlink(destfile)
os.symlink(os.readlink(str(srcfile)), str(destfile))
else:
size = os.stat(src).st_size
with open(srcfile, "rb") as fsrc:
with open(destfile, "wb") as fdest:
_copyfileobj(
fsrc, fdest, callback=callback, total=size, length=buffer_size
)
shutil.copymode(str(srcfile), str(destfile))
return str(destfile)
def _copyfileobj(fsrc, fdest, callback, total, length):
""" copy from fsrc to fdest
Args:
fsrc: filehandle to source file
fdest: filehandle to destination file
callback: callable callback that will be called after every length bytes copied
total: total bytes in source file (will be passed to callback)
length: how many bytes to copy at once (between calls to callback)
"""
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdest.write(buf)
copied += len(buf)
if callback is not None:
callback(len(buf), copied, total)
# -------------------- example usage with different progress bars --------------------
import time
import click
from tqdm import tqdm
@click.command(help="Copy a file from SRCFILE to DESTFILE with click.progressbar.")
@click.argument("srcfile", type=click.Path(exists=True, file_okay=True, dir_okay=False))
@click.argument("destfile", type=click.Path())
@click.option("--nofollow", help="Do not follow symlinks.", is_flag=True, default=False)
@click.option("--bufsize", help=f"Buffer size; default is {BUFFER_SIZE}.", type=int)
@click.option(
"--tqdm",
"tqdm_",
help="Use tqdm progress bar instead of click.",
is_flag=True,
default=False,
)
@click.option(
"--noprogress",
"noprogress",
help="Don't use a progress bar.",
is_flag=True,
default=False,
)
def main(srcfile, destfile, nofollow, tqdm_, noprogress, bufsize):
""" demonstrate use of copy_with_callback and click.progressbar or tqdm progress bar"""
bufsize = bufsize or BUFFER_SIZE
size = os.stat(srcfile).st_size
follow = not nofollow
start_t = time.time()
if noprogress:
dest = copy_with_callback(
srcfile,
destfile,
follow_symlinks=follow,
callback=None,
buffer_size=bufsize,
)
elif tqdm_:
with tqdm(total=size) as bar:
dest = copy_with_callback(
srcfile,
destfile,
follow_symlinks=follow,
callback=lambda copied, total_copied, total: bar.update(copied),
buffer_size=bufsize,
)
else:
with click.progressbar(length=size) as bar:
dest = copy_with_callback(
srcfile,
destfile,
follow_symlinks=follow,
callback=lambda copied, total_copied, total: bar.update(copied),
buffer_size=bufsize,
)
stop_t = time.time()
delta_t = stop_t - start_t
click.echo(f"Done: copied {size} bytes to {dest} in {delta_t:.3f} seconds.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment