Skip to content

Instantly share code, notes, and snippets.

@daniel-garcia
Created April 16, 2012 20:03
Show Gist options
  • Save daniel-garcia/2401137 to your computer and use it in GitHub Desktop.
Save daniel-garcia/2401137 to your computer and use it in GitHub Desktop.
Take a snapshot of a log file every time a given pattern appears on a line
#!/usr/bin/env python
import time, os
import optparse
import fnmatch
import datetime
import subprocess
import logging
_LOG_FORMAT = "%(asctime)s %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=_LOG_FORMAT)
_LOG = logging.getLogger('log_snapshot')
def main():
usage = "usage: %prog [LOGFILE] [PATTERN] [OTHER LOGS]"
parser = optparse.OptionParser(usage=usage)
parser.add_option("--debounce-seconds", default=60, type='int',
help="number of seconds to wait between snapshots.")
options, args = parser.parse_args()
if len(args) < 2:
parser.print_usage()
parser.exit(1)
logFile = args[0]
pattern = args[1]
otherLogFiles = args[2:]
if not os.path.exists(logFile):
print "File % does not exist." % logFile
parser.exit(1)
_LOG.info("logFile: %s", logFile)
_LOG.info("pattern: %s", pattern)
tailLogAndSnapshot(logFile, pattern, options.debounce_seconds, otherLogFiles)
def snapshotLogfile(filename, ts=None):
# snapshot the log file
if ts == None:
ts = datetime.datetime.now()
cmd = "cp %r '%s.snapshot_%s' " % (
filename, filename,
ts.isoformat(),
)
try:
_LOG.info("Taking a snapshot of %r", filename)
lastSnapshot = time.time()
subprocess.check_call(cmd, shell=True)
except Exception as ex:
_LOG.info("An error occurred while taking a snapshot of %r", filename)
_LOG.exception(ex)
def tailLogAndSnapshot(logFile, pattern, debounce_seconds, otherLogFiles):
lastSnapshot = -1
while True:
try:
with open(logFile,'r') as log:
#Find the size of the file and move to the end
st_results = os.stat(logFile)
st_size = st_results[6]
log.seek(st_size)
while True:
where = log.tell()
line = log.readline()
if not line:
time.sleep(1)
log.seek(where)
else:
if fnmatch.fnmatch(line, pattern):
if lastSnapshot > (time.time() - debounce_seconds):
_LOG.info("Pattern matched line but debouncing this match.")
continue
ts = datetime.datetime.now()
snapshotLogfile(log.name, ts)
for f in otherLogFiles:
snapshotLogfile(f, ts)
except KeyboardInterrupt:
break
except Exception as ex:
log.exception(ex)
time.sleep(1)
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment