-
-
Save nbergen/abb07a55df30b05f6aa8ce6e2d771aa3 to your computer and use it in GitHub Desktop.
OctoPrint-AckAckAck. Install by placing octoprint_ackackack.py in ~/.octoprint/plugins or (OctoPrint 1.8.3+) by pasting the RAW(!) gist URL into Plugin Manager > Get More > from URL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
from __future__ import absolute_import | |
import octoprint.plugin | |
import logging | |
from collections import deque | |
class AckAckAckPlugin(octoprint.plugin.StartupPlugin, octoprint.plugin.SettingsPlugin, octoprint.plugin.EventHandlerPlugin): | |
def __init__(self): | |
super(AckAckAckPlugin, self).__init__() | |
self._comm_tail = deque(maxlen=10) # Buffer 10 lines | |
self.good_count = 0 | |
def on_startup(self, host, port): | |
from octoprint.logging.handlers import CleaningTimedRotatingFileHandler | |
path = self._settings.get_plugin_logfile_path() | |
logging_handler = CleaningTimedRotatingFileHandler(path, | |
when="D", | |
backupCount=10) | |
formatter = logging.Formatter("%(asctime)s %(message)s") | |
logging_handler.setFormatter(formatter) | |
logging_handler.setLevel(logging.DEBUG) | |
self._logger.addHandler(logging_handler) | |
self._logger.setLevel(logging.DEBUG) | |
self._logger.propagate = False | |
def handleOkErr(self, err, line): | |
self._logger.error("-" * 80) | |
self._logger.warn(f'Recv WRN after {self.good_count} lines: {err}!') | |
self._logger.warn(f' {line.rstrip()} --> ok') | |
self._logger.error("-" * 80) | |
self.good_count = 0 | |
return "ok" | |
def on_gcode_received(self, comm, line, *args, **kwargs): | |
self._comm_tail.append("Recv: {}".format(line)) | |
err = None | |
if "o\\x00k" in line or "o\x00k" in line: | |
err = 'Garbled OK detected: o<NULL>k' | |
elif line.count("T:") > 1: | |
err = "Pessimistically assuming garbled Temperature string ate the OK" | |
elif 'ok' in line and line.rstrip() != 'ok': | |
err = f'Garbled OK detected: {line.rstrip()}' | |
elif 'ok ' in line: | |
err = f'Garbled OK detected: ok<SPC>' | |
elif 'to "Starting"' in line: | |
err = "Pessimistically assuming garbled communication on starting print" | |
elif line.endswith('k\n') and not line.endswith('ok\n'): | |
err = "Garbled OK detected: k" | |
elif line.endswith(' k\n'): | |
err = "Garbled OK detected: <SPC>k" | |
elif line.endswith(' /k\n'): | |
err = "Garbled OK detected: <SPC><SLASH>k" | |
elif line.endswith('/k\n'): | |
err = "Garbled OK detected: <SLASH>k" | |
elif "\x00" in line: | |
err = "Potential garble detected:<NULL>" | |
elif "Communication timeout" in line: | |
self._logger.error("!!!!! Undetected communication timeout !!!!!") | |
if err: | |
line = self.handleOkErr(err, line) | |
#if line.startswith(" "): | |
# line = self.handleSpaceErr(line) | |
if not err: | |
self.good_count += 1 | |
return line | |
def on_gcode_sent(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs): | |
self._comm_tail.append("Send: {}".format(cmd)) | |
__plugin_name__ = "OctoPrint-AckAckAck" | |
__plugin_pythoncompat__ = ">=2.7,<4" | |
__plugin_implementation__ = AckAckAckPlugin() | |
__plugin_hooks__ = { | |
"octoprint.comm.protocol.gcode.received": __plugin_implementation__.on_gcode_received, | |
"octoprint.comm.protocol.gcode.sent": __plugin_implementation__.on_gcode_sent | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment