Skip to content

Instantly share code, notes, and snippets.

@TheCataliasTNT2k
Last active February 3, 2022 00:11
Show Gist options
  • Save TheCataliasTNT2k/5447b7d0c44b3c66e4765774b2d66026 to your computer and use it in GitHub Desktop.
Save TheCataliasTNT2k/5447b7d0c44b3c66e4765774b2d66026 to your computer and use it in GitHub Desktop.
This script watches your "Bresser REX" or "Flashforge Adventurer 3" for loss of temperature. Happend to me a few times, printer does not recognise it on it's own. This script will pause the printer when this happens. The most part is copied from here: https://github.com/01F0/flashforge-finder-api Caution: Untested yet, because this happens rarely.
import datetime
import re
import socket
import time
from enum import Enum, auto
from os import system
PORT = 8899
HOST = ""
BUFFER_SIZE = 1024
TIMEOUT_SECONDS = 10
request_control_message = '~M601 S1\r\n'
request_info_message = '~M115\r\n'
request_head_position = '~M114\r\n'
request_temp = '~M105\r\n'
request_progress = '~M27\r\n'
request_status = '~M119\r\n'
request_pause = "~M25\r\n"
s = socket.socket()
retries = 0
s.settimeout(TIMEOUT_SECONDS)
def regex_for_field(field_name):
"""Machine Type: Flashforge Finder"""
return field_name + ': ?(.+?)\\r\\n'
def regex_for_coordinates(field_name):
""" X:-19.19 Y:6 Z:7.3 A:846.11 B:0 """
return field_name + ':(.+?) '
def regex_for_current_temperature():
"""T0:210 /210 B:0 /0"""
return 'T0:([0-9].*?) '
def regex_for_target_temperature():
""" T0:210 /210 B:0 /0 """
return '\/([0-9].*?) '
def regex_for_progress():
""" T0:210 /210 B:0 /0 """
return '([0-9].*)\/([0-9].*?)\\r'
def send_and_receive(message_data):
"""Sends and receives data"""
global s, retries
try:
s.send(message_data.encode())
data = s.recv(BUFFER_SIZE)
except Exception as e:
s.connect((HOST, PORT))
if retries >= 10:
print("Retries exceeded!")
exit(1)
retries += 1
return send_and_receive(message_data)
retries = 0
return data.decode()
def get_info():
""" Returns an object with basic printer information such as name etc."""
# send_and_receive(request_control_message)
info_result = send_and_receive(request_info_message)
printer_info = {}
info_fields = ['Type', 'Name', 'Firmware', 'SN', 'X', 'Tool Count']
for field in info_fields:
regex_string = regex_for_field(field)
printer_info[field] = re.search(regex_string, info_result).groups()[0]
return printer_info
def get_head_position():
""" Returns the current x/y/z coordinates of the printer head. """
# send_and_receive(request_control_message)
info_result = send_and_receive(request_head_position)
printer_info = {}
printer_info_fields = ['X', 'Y', 'Z']
for field in printer_info_fields:
regex_string = regex_for_coordinates(field)
printer_info[field] = re.search(regex_string, info_result).groups()[0]
return printer_info
def get_temp():
""" Returns printer temp. Both targeted and current. """
# send_and_receive(request_control_message)
info_result = send_and_receive(request_temp)
regex_temp = regex_for_current_temperature()
regex_target_temp = regex_for_target_temperature()
temp = re.search(regex_temp, info_result).groups()[0]
target_temp = re.search(regex_target_temp, info_result).groups()[0]
return {'Temperature': temp, 'TargetTemperature': target_temp}
def get_progress():
# send_and_receive(request_control_message)
info_result = send_and_receive(request_progress)
regex_groups = re.search(regex_for_progress(), info_result).groups()
printed = int(regex_groups[0])
total = int(regex_groups[1])
if total == 0:
percentage = 0
else:
percentage = int(float(printed) / total * 100)
return {'BytesPrinted': printed,
'BytesTotal': total,
'PercentageCompleted': percentage}
def get_status():
""" Returns the current printer status. """
# send_and_receive(request_control_message)
info_result = send_and_receive(request_status)
printer_info = {}
printer_info_fields = ['Status', 'MachineStatus', 'MoveMode', 'Endstop']
for field in printer_info_fields:
regex_string = regex_for_field(field)
printer_info[field] = re.search(regex_string, info_result).groups()[0]
return printer_info
def pause():
""" Returns the current printer status. """
# send_and_receive(request_control_message)
info_result = send_and_receive(request_pause)
return info_result
def notification(message):
system(
f"sudo -u $USER DISPLAY=:0 DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus notify-send '3D Drucker:' '{message}' -u critical")
def seconds_to_string(seconds: float):
return str(datetime.timedelta(seconds=seconds // 1))
def cleanup():
global s
s.close()
class State(Enum):
IDLE = auto()
HEATING = auto()
HEATED = auto()
PAUSED = auto()
state: State = State.IDLE
last_percent = 0
last_temp = 0
if "success" not in send_and_receive(request_control_message).lower():
notification("Programm abgebrochen!")
cleanup()
exit(1)
print("Programm gestarted!")
try:
while True:
status = get_status()
progress = get_progress()
head = get_head_position()
temp = {k: int(v) for k, v in get_temp().items()}
if status["MachineStatus"] == "READY":
print("IDLE")
if state == State.IDLE and status["MachineStatus"] != "READY":
state = State.HEATING
print("Starte Aufheizen")
if state == state.HEATING:
print(f'HEATING {temp["Temperature"]} / {temp["TargetTemperature"]}')
if state == State.HEATING and temp["Temperature"] >= temp["TargetTemperature"] - 5 and temp["TargetTemperature"] > 100:
print("Temperatur erreicht!")
state = State.HEATED
started_printing = time.time()
if state == State.HEATED and temp["Temperature"] < temp["TargetTemperature"] - 20 and progress["PercentageCompleted"] < 98 and temp["TargetTemperature"] > 100:
print(temp)
print("PAUSED")
pause()
notification("Extruder hat Temperatur verloren, pausiere!")
state = State.PAUSED
last_temp = temp["Temperature"]
if state == State.PAUSED and last_temp < temp["Temperature"]:
state = State.HEATING
if state == State.PAUSED:
last_temp = temp["Temperature"]
if state != State.IDLE and status["MachineStatus"] == "READY":
notification("Drucken fertig")
print('Druck abgeschlossen!')
cleanup()
exit(0)
if state == State.HEATED:
# do not try to estimate the remaining time based on percent, it does not work!
if progress["PercentageCompleted"] > last_percent:
last_percent = progress["PercentageCompleted"]
print(f'{last_percent}% abgeschlossen; Temperatur: {temp["Temperature"]} / {temp["TargetTemperature"]}; Head: f{head}')
time.sleep(1)
except KeyboardInterrupt:
print("Exiting")
except Exception as e:
print(e)
notification("Programm abgestürzt!")
finally:
print("Exiting")
cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment