Last active
February 3, 2022 00:11
-
-
Save TheCataliasTNT2k/5447b7d0c44b3c66e4765774b2d66026 to your computer and use it in GitHub Desktop.
This script watches your "Bresser REX" or "Flashforge Adventurer 3" for loss of temperature. Happend to me a few times, printer does not recognise it on it's own. This script will pause the printer when this happens. The most part is copied from here: https://github.com/01F0/flashforge-finder-api Caution: Untested yet, because this happens rarely.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import re | |
import socket | |
import time | |
from enum import Enum, auto | |
from os import system | |
PORT = 8899 | |
HOST = "" | |
BUFFER_SIZE = 1024 | |
TIMEOUT_SECONDS = 10 | |
request_control_message = '~M601 S1\r\n' | |
request_info_message = '~M115\r\n' | |
request_head_position = '~M114\r\n' | |
request_temp = '~M105\r\n' | |
request_progress = '~M27\r\n' | |
request_status = '~M119\r\n' | |
request_pause = "~M25\r\n" | |
s = socket.socket() | |
retries = 0 | |
s.settimeout(TIMEOUT_SECONDS) | |
def regex_for_field(field_name): | |
"""Machine Type: Flashforge Finder""" | |
return field_name + ': ?(.+?)\\r\\n' | |
def regex_for_coordinates(field_name): | |
""" X:-19.19 Y:6 Z:7.3 A:846.11 B:0 """ | |
return field_name + ':(.+?) ' | |
def regex_for_current_temperature(): | |
"""T0:210 /210 B:0 /0""" | |
return 'T0:([0-9].*?) ' | |
def regex_for_target_temperature(): | |
""" T0:210 /210 B:0 /0 """ | |
return '\/([0-9].*?) ' | |
def regex_for_progress(): | |
""" T0:210 /210 B:0 /0 """ | |
return '([0-9].*)\/([0-9].*?)\\r' | |
def send_and_receive(message_data): | |
"""Sends and receives data""" | |
global s, retries | |
try: | |
s.send(message_data.encode()) | |
data = s.recv(BUFFER_SIZE) | |
except Exception as e: | |
s.connect((HOST, PORT)) | |
if retries >= 10: | |
print("Retries exceeded!") | |
exit(1) | |
retries += 1 | |
return send_and_receive(message_data) | |
retries = 0 | |
return data.decode() | |
def get_info(): | |
""" Returns an object with basic printer information such as name etc.""" | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_info_message) | |
printer_info = {} | |
info_fields = ['Type', 'Name', 'Firmware', 'SN', 'X', 'Tool Count'] | |
for field in info_fields: | |
regex_string = regex_for_field(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def get_head_position(): | |
""" Returns the current x/y/z coordinates of the printer head. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_head_position) | |
printer_info = {} | |
printer_info_fields = ['X', 'Y', 'Z'] | |
for field in printer_info_fields: | |
regex_string = regex_for_coordinates(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def get_temp(): | |
""" Returns printer temp. Both targeted and current. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_temp) | |
regex_temp = regex_for_current_temperature() | |
regex_target_temp = regex_for_target_temperature() | |
temp = re.search(regex_temp, info_result).groups()[0] | |
target_temp = re.search(regex_target_temp, info_result).groups()[0] | |
return {'Temperature': temp, 'TargetTemperature': target_temp} | |
def get_progress(): | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_progress) | |
regex_groups = re.search(regex_for_progress(), info_result).groups() | |
printed = int(regex_groups[0]) | |
total = int(regex_groups[1]) | |
if total == 0: | |
percentage = 0 | |
else: | |
percentage = int(float(printed) / total * 100) | |
return {'BytesPrinted': printed, | |
'BytesTotal': total, | |
'PercentageCompleted': percentage} | |
def get_status(): | |
""" Returns the current printer status. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_status) | |
printer_info = {} | |
printer_info_fields = ['Status', 'MachineStatus', 'MoveMode', 'Endstop'] | |
for field in printer_info_fields: | |
regex_string = regex_for_field(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def pause(): | |
""" Returns the current printer status. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_pause) | |
return info_result | |
def notification(message): | |
system( | |
f"sudo -u $USER DISPLAY=:0 DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus notify-send '3D Drucker:' '{message}' -u critical") | |
def seconds_to_string(seconds: float): | |
return str(datetime.timedelta(seconds=seconds // 1)) | |
def cleanup(): | |
global s | |
s.close() | |
class State(Enum): | |
IDLE = auto() | |
HEATING = auto() | |
HEATED = auto() | |
PAUSED = auto() | |
state: State = State.IDLE | |
last_percent = 0 | |
last_temp = 0 | |
if "success" not in send_and_receive(request_control_message).lower(): | |
notification("Programm abgebrochen!") | |
cleanup() | |
exit(1) | |
print("Programm gestarted!") | |
try: | |
while True: | |
status = get_status() | |
progress = get_progress() | |
head = get_head_position() | |
temp = {k: int(v) for k, v in get_temp().items()} | |
if status["MachineStatus"] == "READY": | |
print("IDLE") | |
if state == State.IDLE and status["MachineStatus"] != "READY": | |
state = State.HEATING | |
print("Starte Aufheizen") | |
if state == state.HEATING: | |
print(f'HEATING {temp["Temperature"]} / {temp["TargetTemperature"]}') | |
if state == State.HEATING and temp["Temperature"] >= temp["TargetTemperature"] - 5 and temp["TargetTemperature"] > 100: | |
print("Temperatur erreicht!") | |
state = State.HEATED | |
started_printing = time.time() | |
if state == State.HEATED and temp["Temperature"] < temp["TargetTemperature"] - 20 and progress["PercentageCompleted"] < 98 and temp["TargetTemperature"] > 100: | |
print(temp) | |
print("PAUSED") | |
pause() | |
notification("Extruder hat Temperatur verloren, pausiere!") | |
state = State.PAUSED | |
last_temp = temp["Temperature"] | |
if state == State.PAUSED and last_temp < temp["Temperature"]: | |
state = State.HEATING | |
if state == State.PAUSED: | |
last_temp = temp["Temperature"] | |
if state != State.IDLE and status["MachineStatus"] == "READY": | |
notification("Drucken fertig") | |
print('Druck abgeschlossen!') | |
cleanup() | |
exit(0) | |
if state == State.HEATED: | |
# do not try to estimate the remaining time based on percent, it does not work! | |
if progress["PercentageCompleted"] > last_percent: | |
last_percent = progress["PercentageCompleted"] | |
print(f'{last_percent}% abgeschlossen; Temperatur: {temp["Temperature"]} / {temp["TargetTemperature"]}; Head: f{head}') | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("Exiting") | |
except Exception as e: | |
print(e) | |
notification("Programm abgestürzt!") | |
finally: | |
print("Exiting") | |
cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment