Last active
May 13, 2021 17:33
-
-
Save graybilldustin/9493d6491c7ad202e2a2d023257f9416 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import requests | |
import math | |
import tkinter as tk | |
from tkinter import * | |
from tkinter.ttk import Progressbar | |
import matplotlib.pyplot as plt | |
from matplotlib.backends.backend_tkagg import (FigureCanvasTkAgg) | |
import logging | |
# User configuration items | |
config = dict() | |
config['root_url'] = "http://10.1.100.2" # Base URL of printer addresses | |
config['printer_count'] = 10 # Quantity of printers to display | |
config['width'] = 4 # Quantity of columns for display grid | |
config['height'] = 3 # Quantity of rows for display grid | |
config['refresh_int'] = 5 # How often to refresh information from printers | |
config['graph_font'] = ("Droid Sans Mono", 20) # Graph font face, font size | |
config['title_font'] = ("Droid Sans Mono", 13) # Print filename font face, font size | |
config['body_font'] = ("Droid Sans Mono", 10) # Print stats font face, font size | |
config['body_color'] = "#000000" | |
config['warning_color'] = "#FF0000" | |
config['stale_refresh_warning_time'] = datetime.timedelta(seconds=10) | |
config['stale_refresh_clear_time'] = datetime.timedelta(minutes=3) | |
# Format X Axis Labels | |
def formatx_func(value, tick_number): | |
return str(abs(10 - int(value)) * config['refresh_int']) + "s" | |
# Format Y Axis Labels | |
def formaty_func(value, tick_number): | |
return str(int(value)) + "°C" | |
# Take printer number and convert it to rows and columns | |
def get_coords(x): | |
return math.floor(x / config['width']), x % config['width'] | |
# Printer class, keeps printer info: url, name, stats, and Tkinter frame | |
class Printer: | |
def __init__(self, name, url): | |
# Name Printer and configure url | |
self.name = name | |
self.url = url | |
# API Call timeout | |
self.timeout = config['refresh_int'] | |
# Empty place for stats | |
self.reset_stats() | |
self.last_refresh_time = None | |
# UI elements | |
self.frame = None | |
self.lbl_title = None | |
self.lbl_prj = None | |
self.progress_bar = None | |
self.lbl_time = None | |
self.lbl_fin = None | |
self.lbl_update = None | |
self.fig = None | |
self.plot = None | |
self.canvas = None | |
def reset_stats(self): | |
self.stats = dict() | |
self.stats['temp_bed'] = [0] * 10 | |
self.stats['temp_nozzle'] = [0] * 10 | |
# Get stats from the API and update the printer's "self.stats" | |
def get_stats(self): | |
temp_temp_bed = self.stats['temp_bed'] | |
temp_temp_nozzle = self.stats['temp_nozzle'] | |
self.reset_stats() | |
self.stats['temp_bed'] = temp_temp_bed | |
self.stats['temp_nozzle'] = temp_temp_nozzle | |
try: | |
# API Call | |
response = requests.get(self.url, timeout=self.timeout) | |
values = dict() | |
# Loop through items in response, if it's a string, run .strip() on it to clean whitespace | |
# If it's not a string, just pass it through | |
for k, v in response.json().items(): | |
if isinstance(v, str): | |
values[k] = v.strip() | |
else: | |
values[k] = v | |
# If we have certain data, save it to the local stats dictionary for use in the UI | |
if 'print_dur' in values: | |
self.stats['print_dur'] = values['print_dur'] | |
if 'progress' in values: | |
self.stats['progress'] = values['progress'] | |
if 'project_name' in values: | |
self.stats['project_name'] = values['project_name'] | |
if 'temp_nozzle' in values: | |
self.stats['temp_nozzle'].append(values['temp_nozzle']) | |
self.stats['temp_nozzle'].pop(0) | |
if 'temp_bed' in values: | |
self.stats['temp_bed'].append(values['temp_bed']) | |
self.stats['temp_bed'].pop(0) | |
time_now = datetime.datetime.now() | |
if 'time_est' in values: | |
self.stats['time_est'] = values['time_est'] | |
self.stats['time_finish'] = time_now + datetime.timedelta(0, int(values['time_est'])) | |
# Log what time the API call was made and finished | |
self.last_refresh_time = time_now | |
except requests.exceptions.RequestException: | |
logging.warning("[{}] Error getting stats".format(self.name)) | |
if (self.last_refresh_time # have we *ever* refreshed? | |
and self.last_refresh_time < datetime.datetime.now() - config['stale_refresh_clear_time'] # was it more than a minute ago? | |
and len(self.stats.keys()) > 2): # is there current data in self.stats, or is it empty? | |
logging.warning("[{}] Haven't refreshed since {}. Clearing stats".format(self.name, self.last_refresh_time)) | |
self.reset_stats() | |
return | |
# Create UI Frame, label, progress bar, and graph elements | |
def configure_frame(self, parent, r, c, start_autorefresh=True): | |
# Frame | |
self.frame = Frame(parent, height=380, width=510, relief=tk.GROOVE, borderwidth=3) | |
self.frame.grid_propagate(0) | |
self.frame.grid(row=r, column=c) | |
# Body | |
self.lbl_prj = Label(self.frame, font=config['title_font']) | |
self.lbl_prj.grid(row=1, column=0, columnspan=3) | |
self.progress_bar = Progressbar(self.frame, orient=HORIZONTAL, mode='determinate', length=400) | |
self.progress_bar.grid(row=2, column=0, columnspan=3) | |
self.lbl_time = Label(self.frame, font=config['body_font']) | |
self.lbl_time.grid(row=3, column=0, sticky=W) | |
self.lbl_fin = Label(self.frame, font=config['body_font']) | |
self.lbl_fin.grid(row=3, column=2, sticky=E) | |
self.lbl_update = Label(self.frame, font=config['body_font']) | |
self.lbl_update.grid(row=3, column=1) | |
self.fig = plt.figure(figsize=(5, 3), dpi=100, facecolor="#F0F0F0") | |
self.plot = self.fig.add_subplot(111) | |
self.canvas = FigureCanvasTkAgg(self.fig, master=self.frame) | |
self.canvas.get_tk_widget().grid(row=0, column=0, columnspan=3) | |
# Schedule first update_frame run | |
if start_autorefresh: | |
self.update_frame() | |
def draw_graph(self): | |
# Update temp graph | |
self.plot.cla() # Close the previous plot | |
self.plot.grid(True) # Enable graph grid | |
self.plot.set_ylim(0, 300) # Set Y axis limit to 300 so it remains a fixed size | |
self.plot.set_title(self.name, fontsize=config['graph_font'][1], fontname=config['graph_font'][0]) # Graph title | |
self.plot.plot(self.stats['temp_bed'], color="green", lw=3, label="Bed Temp") # Plot Bed Temp | |
self.plot.plot(self.stats['temp_nozzle'], color="red", lw=3, label="Nozzle Temp") # Plot Nozzle Temp | |
self.plot.xaxis.set_major_locator(plt.MultipleLocator(1)) # Set X axis increments to 1 | |
self.plot.xaxis.set_major_formatter(plt.FuncFormatter(formatx_func)) # Set X axis labels to # of seconds | |
self.plot.yaxis.set_major_formatter(plt.FuncFormatter(formaty_func)) # Set Y axis labels to degrees C | |
self.plot.legend() # Enable graph legend | |
self.canvas.draw() # Draw graph | |
# Update UI elements | |
def update_frame(self): | |
# Update self.stats | |
self.get_stats() | |
self.draw_graph() | |
# Update File label if it exists, otherwise display "Status: IDLE" | |
self.lbl_prj.configure(text=str(self.stats.get("project_name", "Status: IDLE"))) | |
self.progress_bar.configure(value=self.stats.get("progress", 0)) | |
# Calculate estimated completion time | |
now = datetime.datetime.now() | |
if "print_dur" in self.stats: | |
if self.stats['time_finish'].date() == now.date(): # If we finish today | |
time_finish_text = "Est Finish: {}".format(self.stats['time_finish'].strftime("%I:%M%p")) | |
elif self.stats['time_finish'].date() == now.date() + datetime.timedelta( | |
days=1): # If we finish tomorrow (today + 1) | |
time_finish_text = "Est Finish: Tomorrow {}".format(self.stats['time_finish'].strftime("%I:%M%p")) | |
elif self.stats['time_finish'].date() > now.date() + datetime.timedelta( | |
days=1): # If we finish after tomorrow (> today + 1) | |
time_finish_text = "Est Finish: {}".format(self.stats['time_finish'].strftime("%m/%d %I:%M%p")) | |
else: | |
time_finish_text = "Finish time unknown" # Catch if time format is unclear | |
# Update the UI with our time above | |
self.lbl_fin.configure(text=time_finish_text) | |
self.lbl_time.configure(text="Print Time: {}".format(self.stats.get("print_dur"))) | |
else: | |
self.lbl_fin.configure(text="") | |
self.lbl_time.configure(text="") | |
# refresh our last_refreshed time, with fancy colors in case we start timing out | |
update_color = config['warning_color'] | |
update_text = "No refresh time" | |
if self.last_refresh_time: | |
if self.last_refresh_time > datetime.datetime.now() - config['stale_refresh_warning_time']: | |
update_color = config['body_color'] | |
update_text = self.last_refresh_time.strftime("%H:%M:%S") | |
self.lbl_update.configure(fg=update_color, text=update_text) | |
# Schedule the next update run at configured interval | |
self.frame.after(config['refresh_int']*1000, self.update_frame) | |
if __name__ == "__main__": | |
# Set up our printers | |
printers = list() | |
for index in range(1, config['printer_count'] + 1): | |
# using .format to automatically add leading zeros to make our URL | |
full_url = config['root_url'] + "{:02d}/api/telemetry".format(index) | |
printers.append(Printer("Printer {}".format(str(index)), full_url)) | |
# Create tkinter root element | |
root = Tk() | |
root.title("Print Monitor") | |
# Create frame layout for each printer | |
for index, printer in enumerate(printers): | |
# Where is this printer in the grid | |
row, column = get_coords(index) | |
# set up the UI (and schedule the first update_frame run 5 seconds from now) | |
printer.configure_frame(root, row, column) | |
# Start tkinter mainloop | |
root.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment