Skip to content

Instantly share code, notes, and snippets.

@DanEdens
Forked from FulcronZ/main.py
Last active January 3, 2023 19:17
Show Gist options
  • Save DanEdens/08dbb8c7411d3feb4b3b7e600e79f345 to your computer and use it in GitHub Desktop.
Save DanEdens/08dbb8c7411d3feb4b3b7e600e79f345 to your computer and use it in GitHub Desktop.
Python MQTT Logging Handler
import errno
import logging
import os
import re
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import List
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
# Create utils specific fallback logger for Debugging debug mode
logger = logging.getLogger(__name__)
fileDate = datetime.now().strftime("%Y-%m-%d")
os.environ['ROOT_DIR'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
def establishBroker():
"""
Connect to the MQTT broker for logger mqttHandler stream
:return:
"""
_client = mqtt.Client()
_client.connect(host=os.environ.get('AWSIP', 'localhost'),
port=int(os.environ.get('AWSPORT', 1884))
)
return _client
def makeLogger(name: str = __name__, log_to_file: bool = False,
log_level: str = os.getenv("project_debug", 'DEBUG')) -> logging.Logger:
"""
Create the project wide logger.
:param name: The name of the logger.
:param log_to_file: Whether to log to a file.
:param log_level: The log level to use (e.g. 'DEBUG', 'INFO').
:return: A logger object.
"""
# name = name.replace(".", "/")
_format = '%(asctime)s - %(module)s - %(message)s' if log_level == 'DEBUG' else '%(asctime)s - %(message)s'
log = logging.getLogger(name)
log.setLevel(log_level)
if log_to_file:
filename = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}.log"
_log = ensureExists(Path(os.environ['ROOT_DIR']).joinpath(f"data//{filename}"))
file_handler = logging.FileHandler(_log, mode='a')
file_handler.setFormatter(logging.Formatter(_format))
log.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter(_format))
log.addHandler(stream_handler)
my_handler = mqttHandler(topic=f'log/{name}')
log.addHandler(my_handler)
return log
def remove_file(files: List[str]) -> bool:
"""
Removes old copies of files if they exist.
:param files: The names of the files to remove.
:return: `True` if all files were removed successfully, `False` otherwise.
"""
success = True
for f in files:
try:
os.remove(f)
logger.debug(f'Removing previous copy of {f}..')
except OSError:
success = False
return success
def linePrepender(file, line, depth: int = 0, mode: int = 0):
"""
Prepends given line to given file at depth.
:param file: Filepath to write too
:param line: str to write
:param depth: # of Lines to move away from mode
:param mode: 0=Top,1=current,2=Bottom
:return:
"""
with open(file, 'r+') as _file:
_file.seek(depth, mode)
_file.write(line.rstrip('\r\n') + '\n' + _file.read())
def ensureExists(path):
"""
Accepts path to file, then creates the directory path if it does not exist
:param path:
:return:
"""
if not os.path.exists(os.path.dirname(path)):
try:
os.makedirs(os.path.dirname(path))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
return path
class mqttHandler(logging.Handler):
"""
A handler class which writes logging records, appropriately formatted,
to a MQTT server to a topic.
"""
def __init__(
self,
_hostName=os.environ.get('AWSIP', 'localhost'),
topic=f'log',
qos=1, retain=True,
_port=int(os.environ.get('AWSPORT', 1884)),
client_id='',
keepalive=60,
will=None,
auth=None,
tls=None,
protocol=3,
transport='tcp',
):
logging.Handler.__init__(self)
self.topic = topic
self.qos = qos
self.retain = retain
self.hostname = _hostName
self.port = _port
self.client_id = client_id
self.keepalive = keepalive
self.will = will
self.auth = auth
self.tls = tls
self.protocol = protocol
self.transport = transport
def emit(self, record: str):
"""
Publish a single formatted logging record to a broker,
then disconnect cleanly
:type record: str
"""
msg = self.format(record)
print(f"{self.topic}=:={msg}")
publish.single(
self.topic,
msg,
self.qos,
self.retain,
hostname=self.hostname,
port=self.port,
client_id=self.client_id,
keepalive=self.keepalive,
will=self.will,
auth=self.auth,
tls=self.tls,
protocol=self.protocol,
transport=self.transport
)
def post(topic: str, payload: str, retain: bool = False,
_client=establishBroker()):
"""
Post msg to MQTT broker
:type _client: object
:type topic: str
:type payload: str
:type retain: bool
:param _client: Logging handler. By default, it is created by this module
:param retain: Retain topic on broker
:param topic: Project name
:param payload: Sensor Data
"""
topic = str(f'project_name/{topic}')
payload = str(payload)
try:
_client.publish(topic=topic, payload=payload, qos=0, retain=retain)
except ValueError:
logger.debug(
f"pub Failed because of wildcard: {str(topic)}=:={str(payload)}")
logger.debug(f"Attempting fix...")
try:
tame_t = topic.replace("+", "_")
tame_topic = tame_t.replace("#", "_")
tame_p = payload.replace("+", "_")
tame_payload = tame_p.replace("#", "_")
_client.publish(topic=str(tame_topic), payload=str(tame_payload),
qos=1, retain=retain)
logger.debug("Fix successful, Sending data...")
except Exception as error:
logger.info(f"Fix Failed. Bug report sent.")
_client.publish("project_name/error", str(error), qos=1, retain=True)
def run_command(command: str) -> str:
"""
Run a command in the shell.
:param command: The command to run.
:type command: str
:return: The output of the command.
:rtype: str
"""
process = subprocess.run(command, shell=True, capture_output=True,
text=True)
return process.stdout
def ping_ip(ip_address="192.168.0.1") -> bool:
"""Pings the given IP address until it is successful.
:param ip_address: The IP address to ping.
:type ip_address: str
:returns: bool
.. note:: This function uses the `ping` command to ping the given IP address. It will continue to run
the `ping` command until it is successful (i.e. the `ping` command returns a return code of 0)
"""
while True:
result = subprocess.run(["ping", "-c", "1", ip_address],
stdout=subprocess.PIPE)
if result.returncode == 0:
# Ping was successful
return True
else:
logger.debug(f"Waiting for DUT to power on: {result.returncode}")
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment