Skip to content

Instantly share code, notes, and snippets.

@Lougarou
Created March 15, 2021 20:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lougarou/927e8d6e9a4a9164e070c9f08443451f to your computer and use it in GitHub Desktop.
Save Lougarou/927e8d6e9a4a9164e070c9f08443451f to your computer and use it in GitHub Desktop.
import logging
from logging import handlers
import csv
import io
import time
import os
from datetime import datetime
class CSVFormatter(logging.Formatter):
def __init__(self):
super().__init__()
def format(self, record):
stringIO = io.StringIO()
writer = csv.writer(stringIO, quoting=csv.QUOTE_ALL)
writer.writerow(record.msg)
record.msg = stringIO.getvalue().strip()
return super().format(record)
class CouldNotBeReady(Exception):
pass
class CSVTimedRotatingFileHandler(handlers.TimedRotatingFileHandler):
def __init__(self, filename, when='D', interval=1, backupCount=0,
encoding=None, delay=False, utc=False, atTime=None,
errors=None, retryLimit=5, retryInterval=0.5,header="NO HEADER SPECIFIED"):
self.RETRY_LIMIT = retryLimit
self._header = header
self._retryLimit = retryLimit
self._retryInterval = retryInterval
self._hasHeader = False
super().__init__(filename, when, interval, backupCount, encoding, delay, utc, atTime)
if os.path.getsize(self.baseFilename) == 0:
writer = csv.writer(self.stream, quoting=csv.QUOTE_ALL)
writer.writerow(self._header)
self._hasHeader = True
def doRollover(self):
self._hasHeader = False
self._retryLimit = self.RETRY_LIMIT
super().doRollover()
writer = csv.writer(self.stream, quoting=csv.QUOTE_ALL)
writer.writerow(self._header)
self._hasHeader = True
def emit(self, record):
while self._hasHeader == False:
if self._retryLimit == 0:
raise CouldNotBeReady
time.sleep(self._retryInterval)
self._retryLimit -= 1
pass
super().emit(record)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
loggingStreamHandler = CSVTimedRotatingFileHandler(filename="log.csv", header=["time","number"]) #to save to file
loggingStreamHandler.setFormatter(CSVFormatter())
logger.addHandler(loggingStreamHandler)
import random
while True:
today = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
logger.info([today,random.randint(10,100)])
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment