Last active
July 31, 2017 09:54
-
-
Save jontwo/ce3fc0f28be318344a3afdd91ed41684 to your computer and use it in GitHub Desktop.
Helper library for commonly-used file IO tasks. Just add to your python path and import.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Name: ioutils.py | |
Purpose: Utilities module for file read/write/delete/etc. | |
Author: jpm | |
Created: 17/03/2016 | |
Copyright: (c) jpm 2016 | |
Licence: GPL v3. See http://www.gnu.org/licenses/ | |
""" | |
import logging | |
import logging.handlers | |
import os | |
import shutil | |
import stat | |
import subprocess | |
import sys | |
class SpecialFormatter(logging.Formatter): | |
FORMATS = { | |
logging.DEBUG: "DEBUG: %(filename)s %(lineno)d: %(message)s", | |
logging.INFO: "%(message)s", | |
'DEFAULT': "%(levelname)s: %(message)s" | |
} | |
def format(self, record): | |
self._fmt = self.FORMATS.get(record.levelno, self.FORMATS['DEFAULT']) | |
return logging.Formatter.format(self, record) | |
class IOUtils(object): | |
logger = None | |
def __init__(self, log=None): | |
""" | |
:param log: Optional - an existing log object. | |
If logging is required, either an existing log object can be submitted on __init__, | |
logBasicConfig or logAdvancedConfig called before use, or log handlers can be added | |
to self.logger. Otherwise, all logging is suppressed | |
""" | |
if log: | |
self.logger = log | |
else: | |
self.logger = logging.getLogger(__name__) | |
self.logger.addHandler(logging.NullHandler()) | |
self.logger.setLevel(logging.NOTSET) | |
def fileWrite(self, filename, data, overwrite=False): | |
""" | |
Write to a file and log a warning if there are errors | |
:param filename: Full path of output file | |
:param data: String to write to file | |
:param overwrite: True to overwrite, False (default) to append | |
""" | |
write_mode = 'a' | |
if overwrite: | |
write_mode = 'w' | |
try: | |
with open(filename, write_mode) as f: | |
f.write(data) | |
except OSError: | |
self.logger.warning('failed to write to {}'.format(filename)) | |
@staticmethod | |
def fileRead(filename): | |
""" | |
Read from a file | |
:param filename: Full path of file | |
:return: File contents as string or empty string if there was an error | |
""" | |
try: | |
with open(filename, 'r') as f: | |
return f.read() | |
except (OSError, IOError): | |
return '' | |
def fileDelete(self, filename): | |
""" | |
Attempt to delete files or folders in the given path. | |
Just log a warning and carry on if any can't be deleted. | |
:param filename: name of file or folder to delete | |
""" | |
if os.path.isfile(filename): | |
# single file - delete it and return | |
try: | |
os.chmod(filename, stat.S_IWUSR) | |
os.unlink(filename) | |
except OSError: | |
self.logger.warning('could not delete {}'.format(filename)) | |
return | |
if os.path.isdir(filename): | |
# directory - delete subfiles and folders then delete it | |
for f in os.listdir(filename): | |
self.fileDelete(os.path.join(filename, f)) | |
try: | |
os.chmod(filename, stat.S_IWUSR) | |
os.rmdir(filename) | |
except OSError: | |
self.logger.warning('could not delete {}'.format(filename)) | |
def fileRename(self, filename, newname): | |
""" | |
Attempt to rename a file. Log a warning if there is an error | |
:param filename: Old name | |
:param newname: New name | |
""" | |
try: | |
os.chmod(filename, stat.S_IWUSR) | |
os.rename(filename, newname) | |
except OSError: | |
self.logger.warning('could not rename {0} to {1}'.format(filename, newname)) | |
def fileCopy(self, filename, dest): | |
""" | |
Attempt to copy a file. Log a warning if there is an error | |
:param filename: File to copy | |
:param dest: Destination directory for copied file. Will be created if not found | |
""" | |
try: | |
if not os.path.exists(dest): | |
self.makeDir(dest) | |
shutil.copy2(filename, os.path.join(dest, os.path.basename(filename))) | |
except OSError: | |
self.logger.warning('could not copy {0} to {1}'.format(filename, dest)) | |
def makeDir(self, dirname): | |
""" | |
Make a directory. If directory name does not include pardir (..), also | |
makes any parent dirs necessary | |
:param dirname: Directory name to create | |
""" | |
try: | |
if os.path.pardir in dirname: | |
os.mkdir(dirname) | |
else: | |
# best to use recursive make dir, but | |
# only works if dest does not contain pardir (..) | |
os.makedirs(dirname) | |
except OSError: | |
self.logger.error('could not create directory {}'.format(dirname)) | |
@staticmethod | |
def getFileSize(filename): | |
""" | |
Find the size of a file | |
:param filename: Full file path | |
:return: File size or 0 if there is an error | |
""" | |
try: | |
return os.path.getsize(filename) | |
except OSError: | |
return 0 | |
def doRobocopy(self, src, dest, retries=5, subfolders=True): | |
""" | |
Copy from one folder to another using robocopy. | |
Any errors are written to the log | |
:param src: Source folder | |
:param dest: Destination folder | |
:param retries: Number of retries if there is an error (default 5) | |
:param subfolders: Copy subfolders too (default true) | |
""" | |
# default options - avoid lock and system files | |
command = [ | |
'robocopy', src, dest, | |
'/s' if subfolders else '', '/mir', '/r:{}'.format(retries), '/w:5', '/tee', '/fft', '/dst', '/dcopy:t', | |
'/xf', '._*', '.ds_store', 'pagefile.sys', 'thumbs.db', '*.ldb', '*.lock', '*.lck', 'ntuser.dat*', | |
'Thumbs.db:encryptable', '/xd', '.*', '"System Volume Information"', 'RECYCLER', | |
] | |
try: | |
subprocess.check_call(command) | |
except subprocess.CalledProcessError, e: | |
if e.returncode == 0: | |
self.logger.warning('no files copied - no changes detected') | |
if e.returncode & 2 != 0: | |
self.logger.info('extra files detected') | |
if e.returncode & 4 != 0: | |
self.logger.warning('mismatched files detected') | |
if e.returncode & 8 != 0: | |
self.logger.error('failed to copy files') | |
if e.returncode & 16 != 0: | |
self.logger.error('serious copy error') | |
def logToFile(self, filepath, rotate=False): | |
""" | |
Send log output to a file | |
:param filepath: Full path of file | |
:param rotate: Use a rotating file handler, default is False | |
""" | |
if rotate: | |
fh = logging.handlers.RotatingFileHandler(filepath, maxBytes=1000000, backupCount=3) | |
else: | |
fh = logging.FileHandler(filepath) | |
fh.setLevel(logging.DEBUG) | |
ff = logging.Formatter('%(asctime)s: %(levelname)s: %(name)s: %(message)s') | |
fh.setFormatter(ff) | |
self.logger.addHandler(fh) | |
def logBasicConfig(self): | |
""" | |
Use default log parameters | |
""" | |
logging.basicConfig() | |
def logAdvancedConfig(self): | |
""" | |
Use a more advanced log format: info level just logs the message, but higher levels | |
include line number and level | |
""" | |
self.logger.setLevel(logging.DEBUG) | |
sh = logging.StreamHandler(sys.stderr) | |
sh.setFormatter(SpecialFormatter()) | |
sh.setLevel(logging.DEBUG) | |
self.logger.addHandler(sh) | |
def logFormat(self, format): | |
""" | |
Define log format using attributes such as %(asctime)s %(filename)s %(name) | |
%(funcName)s %(levelname)s %(levelno)s %(lineno)d %(message)s %(thread)d %(threadName)s | |
This format is applied to all handlers for the current logger | |
""" | |
fmt = logging.Formatter(format) | |
for h in list(self.logger.handlers): | |
h.setFormatter(fmt) | |
def logLevel(self, level): | |
""" | |
Set current logger level | |
""" | |
self.logger.setLevel(level) | |
def writeLog(self, msg, level=logging.INFO): | |
""" | |
Write a log message at the given level (default info). This level temporarily | |
overrides the current logger level | |
""" | |
self.logger.log(level, msg) | |
def writeDebug(self, msg): | |
""" | |
Write a debug message to the current logger | |
""" | |
self.logger.debug(msg) | |
def writeInfo(self, msg): | |
""" | |
Write an info message to the current logger | |
""" | |
self.logger.info(msg) | |
def writeWarning(self, msg): | |
""" | |
Write a warning message to the current logger | |
""" | |
self.logger.warning(msg) | |
def writeError(self, msg): | |
""" | |
Write an error message to the current logger | |
""" | |
self.logger.error(msg) | |
def writeCritical(self, msg): | |
""" | |
Write a critical message to the current logger | |
""" | |
self.logger.critical(msg) |
Added some logging methods
Added single file copy and updated Robocopy options
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Typical usage:
etc