Created
February 2, 2022 17:51
-
-
Save TheCataliasTNT2k/2569674124a42f7d387e8a10fec21ee0 to your computer and use it in GitHub Desktop.
This script watches you "Bresser REX" or "Flashforge Adventurer 3" for loss of temperature.Happend to me a few times,
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import re | |
import socket | |
import time | |
from enum import Enum, auto | |
from os import system | |
PORT = 8899 | |
HOST = "" | |
BUFFER_SIZE = 1024 | |
TIMEOUT_SECONDS = 10 | |
request_control_message = '~M601 S1\r\n' | |
request_info_message = '~M115\r\n' | |
request_head_position = '~M114\r\n' | |
request_temp = '~M105\r\n' | |
request_progress = '~M27\r\n' | |
request_status = '~M119\r\n' | |
request_pause = "~M25\r\n" | |
s = socket.socket() | |
retries = 0 | |
s.settimeout(TIMEOUT_SECONDS) | |
def regex_for_field(field_name): | |
"""Machine Type: Flashforge Finder""" | |
return field_name + ': ?(.+?)\\r\\n' | |
def regex_for_coordinates(field_name): | |
""" X:-19.19 Y:6 Z:7.3 A:846.11 B:0 """ | |
return field_name + ':(.+?) ' | |
def regex_for_current_temperature(): | |
"""T0:210 /210 B:0 /0""" | |
return 'T0:([0-9].*?) ' | |
def regex_for_target_temperature(): | |
""" T0:210 /210 B:0 /0 """ | |
return '\/([0-9].*?) ' | |
def regex_for_progress(): | |
""" T0:210 /210 B:0 /0 """ | |
return '([0-9].*)\/([0-9].*?)\\r' | |
def send_and_receive(message_data): | |
"""Sends and receives data""" | |
global s, retries | |
try: | |
s.send(message_data.encode()) | |
data = s.recv(BUFFER_SIZE) | |
except Exception as e: | |
s.connect((HOST, PORT)) | |
if retries >= 10: | |
print("Retries exceeded!") | |
exit(1) | |
retries += 1 | |
return send_and_receive(message_data) | |
retries = 0 | |
return data.decode() | |
def get_info(): | |
""" Returns an object with basic printer information such as name etc.""" | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_info_message) | |
printer_info = {} | |
info_fields = ['Type', 'Name', 'Firmware', 'SN', 'X', 'Tool Count'] | |
for field in info_fields: | |
regex_string = regex_for_field(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def get_head_position(): | |
""" Returns the current x/y/z coordinates of the printer head. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_head_position) | |
printer_info = {} | |
printer_info_fields = ['X', 'Y', 'Z'] | |
for field in printer_info_fields: | |
regex_string = regex_for_coordinates(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def get_temp(): | |
""" Returns printer temp. Both targeted and current. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_temp) | |
regex_temp = regex_for_current_temperature() | |
regex_target_temp = regex_for_target_temperature() | |
temp = re.search(regex_temp, info_result).groups()[0] | |
target_temp = re.search(regex_target_temp, info_result).groups()[0] | |
return {'Temperature': temp, 'TargetTemperature': target_temp} | |
def get_progress(): | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_progress) | |
regex_groups = re.search(regex_for_progress(), info_result).groups() | |
printed = int(regex_groups[0]) | |
total = int(regex_groups[1]) | |
if total == 0: | |
percentage = 0 | |
else: | |
percentage = int(float(printed) / total * 100) | |
return {'BytesPrinted': printed, | |
'BytesTotal': total, | |
'PercentageCompleted': percentage} | |
def get_status(): | |
""" Returns the current printer status. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_status) | |
printer_info = {} | |
printer_info_fields = ['Status', 'MachineStatus', 'MoveMode', 'Endstop'] | |
for field in printer_info_fields: | |
regex_string = regex_for_field(field) | |
printer_info[field] = re.search(regex_string, info_result).groups()[0] | |
return printer_info | |
def pause(): | |
""" Returns the current printer status. """ | |
# send_and_receive(request_control_message) | |
info_result = send_and_receive(request_pause) | |
return info_result | |
def notification(message): | |
system( | |
f"sudo -u jo DISPLAY=:0 DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus notify-send '3D Drucker:' '{message}' -u critical") | |
def seconds_to_string(seconds: float): | |
return str(datetime.timedelta(seconds=seconds // 1)) | |
def cleanup(): | |
global s | |
s.close() | |
class State(Enum): | |
IDLE = auto() | |
HEATING = auto() | |
HEATED = auto() | |
PAUSED = auto() | |
start = 0 | |
started_printing = 0 | |
state: State = State.IDLE | |
seconds_per_percent = 0 | |
last_percent = 0 | |
last_temp = 0 | |
if "success" not in send_and_receive(request_control_message).lower(): | |
notification("Programm abgebrochen!") | |
cleanup() | |
exit(1) | |
print("Programm gestarted!") | |
try: | |
while True: | |
status = get_status() | |
progress = get_progress() | |
head = get_head_position() | |
temp = get_temp() | |
if status["MachineStatus"] == "READY": | |
print("IDLE") | |
if start == 0 and status["MachineStatus"] != "READY": | |
start = time.time() | |
state = State.HEATING | |
if state == state.HEATING: | |
print(f'HEATING {temp["Temperature"]} / {temp["TargetTemperature"]}') | |
if state == State.HEATING and temp["Temperature"] >= temp["TargetTemperature"] - 5: | |
state = State.HEATED | |
started_printing = time.time() | |
if state == State.HEATED and temp["Temperature"] < temp["TargetTemperature"] - 10: | |
pause() | |
notification("Extruder hat Temperatur verloren, pausiere!") | |
state = State.PAUSED | |
last_temp = temp["Temperature"] | |
print("PAUSED") | |
if state == State.PAUSED and last_temp < temp["Temperature"]: | |
state = State.HEATING | |
if state == State.PAUSED: | |
last_temp = temp["Temperature"] | |
if state != State.IDLE and status["MachineStatus"] == "READY": | |
notification("Drucken fertig") | |
print(f'Druck abgeschlossen! Druckzeit Zeit: f{seconds_to_string(time.time() - started_printing)}; Gesamtzeit: {seconds_to_string(time.time() - start)}') | |
cleanup() | |
exit(0) | |
if state == State.HEATED: | |
if progress["PercentageCompleted"] > last_percent: | |
if last_percent == 0: | |
seconds_per_percent = time.time() - started_printing | |
last_percent = progress["PercentageCompleted"] | |
print(f'{last_percent}% abgeschlossen; Vergangene Zeit: f{seconds_to_string(time.time() - start)}; Temperatur: {temp["Temperature"]} / {temp["TargetTemperature"]}; Head: f{head} Verbleibende Zeit: {seconds_to_string((100 - last_percent) * seconds_per_percent)}') | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("Exiting") | |
finally: | |
cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment