Skip to content

Instantly share code, notes, and snippets.

@jontwo
Last active July 31, 2017 09:54
Show Gist options
  • Save jontwo/ce3fc0f28be318344a3afdd91ed41684 to your computer and use it in GitHub Desktop.
Save jontwo/ce3fc0f28be318344a3afdd91ed41684 to your computer and use it in GitHub Desktop.
Helper library for commonly-used file IO tasks. Just add to your python path and import.
"""
Name: ioutils.py
Purpose: Utilities module for file read/write/delete/etc.
Author: jpm
Created: 17/03/2016
Copyright: (c) jpm 2016
Licence: GPL v3. See http://www.gnu.org/licenses/
"""
import logging
import logging.handlers
import os
import shutil
import stat
import subprocess
import sys
class SpecialFormatter(logging.Formatter):
FORMATS = {
logging.DEBUG: "DEBUG: %(filename)s %(lineno)d: %(message)s",
logging.INFO: "%(message)s",
'DEFAULT': "%(levelname)s: %(message)s"
}
def format(self, record):
self._fmt = self.FORMATS.get(record.levelno, self.FORMATS['DEFAULT'])
return logging.Formatter.format(self, record)
class IOUtils(object):
logger = None
def __init__(self, log=None):
"""
:param log: Optional - an existing log object.
If logging is required, either an existing log object can be submitted on __init__,
logBasicConfig or logAdvancedConfig called before use, or log handlers can be added
to self.logger. Otherwise, all logging is suppressed
"""
if log:
self.logger = log
else:
self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.NullHandler())
self.logger.setLevel(logging.NOTSET)
def fileWrite(self, filename, data, overwrite=False):
"""
Write to a file and log a warning if there are errors
:param filename: Full path of output file
:param data: String to write to file
:param overwrite: True to overwrite, False (default) to append
"""
write_mode = 'a'
if overwrite:
write_mode = 'w'
try:
with open(filename, write_mode) as f:
f.write(data)
except OSError:
self.logger.warning('failed to write to {}'.format(filename))
@staticmethod
def fileRead(filename):
"""
Read from a file
:param filename: Full path of file
:return: File contents as string or empty string if there was an error
"""
try:
with open(filename, 'r') as f:
return f.read()
except (OSError, IOError):
return ''
def fileDelete(self, filename):
"""
Attempt to delete files or folders in the given path.
Just log a warning and carry on if any can't be deleted.
:param filename: name of file or folder to delete
"""
if os.path.isfile(filename):
# single file - delete it and return
try:
os.chmod(filename, stat.S_IWUSR)
os.unlink(filename)
except OSError:
self.logger.warning('could not delete {}'.format(filename))
return
if os.path.isdir(filename):
# directory - delete subfiles and folders then delete it
for f in os.listdir(filename):
self.fileDelete(os.path.join(filename, f))
try:
os.chmod(filename, stat.S_IWUSR)
os.rmdir(filename)
except OSError:
self.logger.warning('could not delete {}'.format(filename))
def fileRename(self, filename, newname):
"""
Attempt to rename a file. Log a warning if there is an error
:param filename: Old name
:param newname: New name
"""
try:
os.chmod(filename, stat.S_IWUSR)
os.rename(filename, newname)
except OSError:
self.logger.warning('could not rename {0} to {1}'.format(filename, newname))
def fileCopy(self, filename, dest):
"""
Attempt to copy a file. Log a warning if there is an error
:param filename: File to copy
:param dest: Destination directory for copied file. Will be created if not found
"""
try:
if not os.path.exists(dest):
self.makeDir(dest)
shutil.copy2(filename, os.path.join(dest, os.path.basename(filename)))
except OSError:
self.logger.warning('could not copy {0} to {1}'.format(filename, dest))
def makeDir(self, dirname):
"""
Make a directory. If directory name does not include pardir (..), also
makes any parent dirs necessary
:param dirname: Directory name to create
"""
try:
if os.path.pardir in dirname:
os.mkdir(dirname)
else:
# best to use recursive make dir, but
# only works if dest does not contain pardir (..)
os.makedirs(dirname)
except OSError:
self.logger.error('could not create directory {}'.format(dirname))
@staticmethod
def getFileSize(filename):
"""
Find the size of a file
:param filename: Full file path
:return: File size or 0 if there is an error
"""
try:
return os.path.getsize(filename)
except OSError:
return 0
def doRobocopy(self, src, dest, retries=5, subfolders=True):
"""
Copy from one folder to another using robocopy.
Any errors are written to the log
:param src: Source folder
:param dest: Destination folder
:param retries: Number of retries if there is an error (default 5)
:param subfolders: Copy subfolders too (default true)
"""
# default options - avoid lock and system files
command = [
'robocopy', src, dest,
'/s' if subfolders else '', '/mir', '/r:{}'.format(retries), '/w:5', '/tee', '/fft', '/dst', '/dcopy:t',
'/xf', '._*', '.ds_store', 'pagefile.sys', 'thumbs.db', '*.ldb', '*.lock', '*.lck', 'ntuser.dat*',
'Thumbs.db:encryptable', '/xd', '.*', '"System Volume Information"', 'RECYCLER',
]
try:
subprocess.check_call(command)
except subprocess.CalledProcessError, e:
if e.returncode == 0:
self.logger.warning('no files copied - no changes detected')
if e.returncode & 2 != 0:
self.logger.info('extra files detected')
if e.returncode & 4 != 0:
self.logger.warning('mismatched files detected')
if e.returncode & 8 != 0:
self.logger.error('failed to copy files')
if e.returncode & 16 != 0:
self.logger.error('serious copy error')
def logToFile(self, filepath, rotate=False):
"""
Send log output to a file
:param filepath: Full path of file
:param rotate: Use a rotating file handler, default is False
"""
if rotate:
fh = logging.handlers.RotatingFileHandler(filepath, maxBytes=1000000, backupCount=3)
else:
fh = logging.FileHandler(filepath)
fh.setLevel(logging.DEBUG)
ff = logging.Formatter('%(asctime)s: %(levelname)s: %(name)s: %(message)s')
fh.setFormatter(ff)
self.logger.addHandler(fh)
def logBasicConfig(self):
"""
Use default log parameters
"""
logging.basicConfig()
def logAdvancedConfig(self):
"""
Use a more advanced log format: info level just logs the message, but higher levels
include line number and level
"""
self.logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler(sys.stderr)
sh.setFormatter(SpecialFormatter())
sh.setLevel(logging.DEBUG)
self.logger.addHandler(sh)
def logFormat(self, format):
"""
Define log format using attributes such as %(asctime)s %(filename)s %(name)
%(funcName)s %(levelname)s %(levelno)s %(lineno)d %(message)s %(thread)d %(threadName)s
This format is applied to all handlers for the current logger
"""
fmt = logging.Formatter(format)
for h in list(self.logger.handlers):
h.setFormatter(fmt)
def logLevel(self, level):
"""
Set current logger level
"""
self.logger.setLevel(level)
def writeLog(self, msg, level=logging.INFO):
"""
Write a log message at the given level (default info). This level temporarily
overrides the current logger level
"""
self.logger.log(level, msg)
def writeDebug(self, msg):
"""
Write a debug message to the current logger
"""
self.logger.debug(msg)
def writeInfo(self, msg):
"""
Write an info message to the current logger
"""
self.logger.info(msg)
def writeWarning(self, msg):
"""
Write a warning message to the current logger
"""
self.logger.warning(msg)
def writeError(self, msg):
"""
Write an error message to the current logger
"""
self.logger.error(msg)
def writeCritical(self, msg):
"""
Write a critical message to the current logger
"""
self.logger.critical(msg)
@jontwo
Copy link
Author

jontwo commented Dec 21, 2016

Typical usage:

import ioutils
io = ioutils.IOUtils()
text = io.fileRead('path/to/file')

etc

@jontwo
Copy link
Author

jontwo commented Mar 1, 2017

Added some logging methods

@jontwo
Copy link
Author

jontwo commented Jul 31, 2017

Added single file copy and updated Robocopy options

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment