Last active
April 3, 2024 21:00
-
-
Save rwbot/e45d93ab3a9240f524434b3f97767684 to your computer and use it in GitHub Desktop.
ros3dp diff manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from matplotlib.font_manager import json_dump | |
import pyudev | |
from pyudev import Context, Device, Monitor, MonitorObserver | |
import os | |
import roslaunch | |
import rospy | |
import sys | |
import subprocess | |
import threading | |
from fb_handler import FirebaseHandler | |
from ros3dp.msg import Pulse, Heartbeat, NodeInfo, PrinterCommand, Temperatures | |
from collections import deque | |
import json | |
import websocket | |
import datetime | |
import signal | |
import copy | |
import rosnode | |
from std_msgs.msg import std_msgs | |
class PrintManager(): | |
def __init__(self): | |
self.nodeId = rospy.get_param("nodeId") | |
self.nodeHostname = rospy.get_param("hostname") | |
self.nodeSerial = rospy.get_param("nodeSerial") | |
# self.slantWebSocketURL = rospy.get_param("slantWebSocketURL") | |
self.heartbeat_template_file = rospy.get_param("heartbeatTemplate") | |
self.heartbeat_interval = rospy.get_param("heartbeatInterval") | |
self.pulse_interval = rospy.get_param("pulseInterval") | |
self.pulseQueue = deque() | |
self.pulsesDict: dict = {} | |
self.nodeInfo = NodeInfo() | |
self.heartbeat = Heartbeat(std_msgs.msg.Header(), self.nodeInfo, []) | |
self.fbh = FirebaseHandler() | |
self.command_pub = rospy.Publisher('/printer_command', PrinterCommand, queue_size=10) | |
self.heartbeat_pub = rospy.Publisher('/heartbeat', Heartbeat, queue_size=10) | |
rospy.sleep(2) # wait for everything to initialize | |
def pulse_callback(self, msg): | |
rospy.logdebug(f"Pulse received from {msg.serial_num} at {self.toDatetime(msg.timestamp)}") | |
self.pulseQueue.append(msg) | |
# rospy.logdebug(f"All Pulses: {self.pulses}") | |
def updatePulses(self): | |
rospy.loginfo(f"Updating pulses") | |
while len(self.pulseQueue) > 0: | |
currentPulse = self.pulseQueue.popleft() | |
try: | |
self.pulsesDict[currentPulse.serial_num] = currentPulse | |
except Exception as e: | |
rospy.logerr(f"Error updating pulses: {e}") | |
def publish_heartbeat(self,event): | |
rospy.loginfo("Publishing heartbeat") | |
temp = copy.deepcopy(self.pulsesDict) | |
self.pulsesDict = {} | |
self.heartbeat.printers.clear() | |
nodes = rosnode.get_node_names() | |
for p in temp: | |
self.heartbeat.printers.append(temp[p]) | |
self.heartbeat.header.stamp = rospy.Time.now() | |
self.heartbeat_pub.publish(self.heartbeat) | |
rospy.loginfo(f"Publishing heartbeat {self.heartbeat}") | |
def publish_command(self, cmd: PrinterCommand): | |
rospy.logwarn("Publishing printer command") | |
file_path = self.download_gcode(cmd.file_name) | |
rospy.logwarn(f"Gcode downloaded to path: {file_path}") | |
if os.path.exists(file_path): | |
rospy.logwarn(f"Publishing command: {cmd}") | |
self.command_pub.publish(cmd) | |
else: | |
rospy.logerr(f"Error downloading gcode") | |
def download_gcode(self, file_name): | |
# download gcode from Firebase | |
rospy.logdebug(f"Attempting to download {file_name} from Firebase") | |
try: | |
if self.fbh.connected: | |
file_path = self.fbh.get_gcode(file_name) | |
return file_path | |
else: | |
rospy.logerr("Firebase not connected") | |
return False | |
except Exception as e: | |
rospy.logerr(f"Error downloading gcode: {e}") | |
def startPrint(self, msg): | |
serial: str = stuff["serial"] | |
printerId: int = stuff["printerId"] | |
file: str = stuff["file"] | |
print(f"Starting print for printer {printerId} with file {file}") | |
cmd = PrinterCommand() | |
cmd.command = "start_print" | |
cmd.serial_num = serial | |
cmd.file_name = file | |
self.publish_command(cmd) | |
def exit_handler(self, signum, frame): | |
rospy.sleep(2) | |
sys.exit() | |
def toDatetime(self, rosTime): | |
return datetime.datetime.fromtimestamp(rosTime.to_sec(), datetime.timezone.utc) | |
def toRostime(self, datetime): | |
return rospy.Time.from_sec(datetime.timestamp()) | |
def main(): | |
# sys.argv = rospy.myargv(argv=sys.argv) | |
rospy.init_node("print_manager") | |
rospy.logwarn(f"{rospy.get_param('nodeId')} Print Manager started with heartbeat interval {rospy.get_param('heartbeatInterval')}") | |
rospy.logwarn(f"Print Manager started with heartbeat interval") | |
pm = PrintManager() | |
rospy.loginfo(f"Print Manager started with heartbeat interval {pm.heartbeat_interval}") | |
rospy.Timer(rospy.Duration(pm.heartbeat_interval), pm.publish_heartbeat) | |
pulseSub = rospy.Subscriber('/pulse', Pulse, pm.pulse_callback) | |
signal.signal(signal.SIGINT,pm.exit_handler) # Ctrl+C | |
signal.signal(signal.SIGTSTP,pm.exit_handler) # Ctrl+Z | |
while not rospy.is_shutdown(): | |
# do whatever you want here | |
pm.updatePulses() | |
rospy.sleep(1) # sleep for one second | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment