Skip to content

Instantly share code, notes, and snippets.

@nbergen
Forked from Mauker1/octoprint_ackackack.py
Last active April 7, 2024 19:21
Show Gist options
  • Save nbergen/abb07a55df30b05f6aa8ce6e2d771aa3 to your computer and use it in GitHub Desktop.
Save nbergen/abb07a55df30b05f6aa8ce6e2d771aa3 to your computer and use it in GitHub Desktop.
OctoPrint-AckAckAck. Install by placing octoprint_ackackack.py in ~/.octoprint/plugins or (OctoPrint 1.8.3+) by pasting the RAW(!) gist URL into Plugin Manager > Get More > from URL
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
import logging
from collections import deque
class AckAckAckPlugin(octoprint.plugin.StartupPlugin, octoprint.plugin.SettingsPlugin, octoprint.plugin.EventHandlerPlugin):
def __init__(self):
super(AckAckAckPlugin, self).__init__()
self._comm_tail = deque(maxlen=10) # Buffer 10 lines
self.good_count = 0
def on_startup(self, host, port):
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler
path = self._settings.get_plugin_logfile_path()
logging_handler = CleaningTimedRotatingFileHandler(path,
when="D",
backupCount=10)
formatter = logging.Formatter("%(asctime)s %(message)s")
logging_handler.setFormatter(formatter)
logging_handler.setLevel(logging.DEBUG)
self._logger.addHandler(logging_handler)
self._logger.setLevel(logging.DEBUG)
self._logger.propagate = False
def handleOkErr(self, err, line):
self._logger.error("-" * 80)
self._logger.warn(f'Recv WRN after {self.good_count} lines: {err}!')
self._logger.warn(f' {line.rstrip()} --> ok')
self._logger.error("-" * 80)
self.good_count = 0
return "ok"
def on_gcode_received(self, comm, line, *args, **kwargs):
self._comm_tail.append("Recv: {}".format(line))
err = None
if "o\\x00k" in line or "o\x00k" in line:
err = 'Garbled OK detected: o<NULL>k'
elif line.count("T:") > 1:
err = "Pessimistically assuming garbled Temperature string ate the OK"
elif 'ok' in line and line.rstrip() != 'ok':
err = f'Garbled OK detected: {line.rstrip()}'
elif 'ok ' in line:
err = f'Garbled OK detected: ok<SPC>'
elif 'to "Starting"' in line:
err = "Pessimistically assuming garbled communication on starting print"
elif line.endswith('k\n') and not line.endswith('ok\n'):
err = "Garbled OK detected: k"
elif line.endswith(' k\n'):
err = "Garbled OK detected: <SPC>k"
elif line.endswith(' /k\n'):
err = "Garbled OK detected: <SPC><SLASH>k"
elif line.endswith('/k\n'):
err = "Garbled OK detected: <SLASH>k"
elif "\x00" in line:
err = "Potential garble detected:<NULL>"
elif "Communication timeout" in line:
self._logger.error("!!!!! Undetected communication timeout !!!!!")
if err:
line = self.handleOkErr(err, line)
#if line.startswith(" "):
# line = self.handleSpaceErr(line)
if not err:
self.good_count += 1
return line
def on_gcode_sent(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs):
self._comm_tail.append("Send: {}".format(cmd))
__plugin_name__ = "OctoPrint-AckAckAck"
__plugin_pythoncompat__ = ">=2.7,<4"
__plugin_implementation__ = AckAckAckPlugin()
__plugin_hooks__ = {
"octoprint.comm.protocol.gcode.received": __plugin_implementation__.on_gcode_received,
"octoprint.comm.protocol.gcode.sent": __plugin_implementation__.on_gcode_sent
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment