Skip to content

Instantly share code, notes, and snippets.

@euphy
Last active May 6, 2022 18:56
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save euphy/75366a70346689190bb5 to your computer and use it in GitHub Desktop.
Save euphy/75366a70346689190bb5 to your computer and use it in GitHub Desktop.
Polargraph queue sender (low resource)
#!/usr/bin/python
"""
MIT License
Copyright (c) 2022 Sandy Noble
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Only python package required is pyserial:
pip install pyserial
Use this wee script to send a command queue to a Polargraph machine.
If the filename is "mycommandqueue.txt" and the machine is connected on COM7, then do:
python send.py mycommandqueue.txt COM7
"""
import os
import serial
import time
import sys
BAUD_RATE = 57600
class Polargraph():
"""
This is a crude model of a drawing machine. It includes it's drawing state
as well as the state of the communications lines to the machine and the
queue of commands going to it.
"""
def __init__(self):
self.time_started = time.time()
serial_port = None
file = None
# State
ready = False
file_position = 0
total_lines = 0
def start_serial_comms(self, comm_port, baud_rate=57600):
"""
Attempts to connect this machine to it's comm_port. It starts two
threads, one for reading that is attached to the 'received_log'
list, and one thread for writing that is attached to the main outgoing
command queue.
:param comm_port: The name of the comm port
:param baud_rate: The speed to open the serial connection at
:return:
"""
try:
self.serial_port = serial.Serial(comm_port, baudrate=baud_rate)
print "Connected successfully to %s (%s)." % (COMM_PORT, serial)
return True
except Exception as e:
print("Oh there was an exception loading the port %s: %s" % (comm_port, e))
print e.message
self.serial_port = None
return False
def load_file(self, filename):
"""
Checks that the file exists, opens it and counts the lines in it.
:param filename:
:return:
"""
if os.path.isfile(filename):
print "Found %s!" % os.path.abspath(filename)
self.file = open(filename, 'r')
# Lets have a quick review of this file
lines = 0
for lines, l in enumerate(self.file):
pass
self.total_lines = lines + 1
print "This file has %s lines." % self.total_lines
# reset the file position
self.file.seek(0)
else:
self.file = None
print "File %s couldn't be found!" % os.path.abspath(filename)
def read_line(self):
l = self.serial_port.readline().strip()
print "received: {:s}".format(l)
if l.startswith("READY"):
# if it's changing from not ready to ready, then it's just finished a command
if not self.ready and self.file_position > 0: # ie it's not the first one
time_ran = time.time() - self.time_started
time_per_command = time_ran / (self.file_position)
time_projected = self.total_lines * time_per_command
time_left = time_projected - time_ran
m, s = divmod(time_left, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
print "Ran {:d} commands in {:0.2f} seconds: " \
"at {:0.2f} seconds per command, we'll finish in {:02d}:{:02d}:{:02d})".format(self.file_position,
time_ran,
time_per_command,
int(h), int(m), int(s))
self.ready = True
def write_line(self):
if self.ready and self.file:
l = self.file.readline().strip()
self.file_position += 1
print "Command {}/{}: {} ({:.0%})".format(self.file_position, self.total_lines, l, (float(self.file_position) / float(self.total_lines)))
self.serial_port.write(l + "\n")
self.ready = False
def commands_queued(self):
return self.total_lines - self.file_position
def close(self):
print "Finished sending {:d} commands in {:0.2f}".format(self.total_lines, time.time() - self.time_started)
self.file.close()
def main():
# Assemble a Polargraph object, load a file, open the comms port.
polargraph = Polargraph()
polargraph.load_file(INPUT_FILENAME)
opened = polargraph.start_serial_comms(comm_port=COMM_PORT)
if not opened:
print "There was a problem opening the communications port. It should be entered exactly as you see it in" \
"your operating system."
exit()
while polargraph.commands_queued():
polargraph.write_line()
polargraph.read_line()
polargraph.close()
if __name__ == "__main__":
print len(sys.argv)
if len(sys.argv) == 3:
INPUT_FILENAME = sys.argv[1]
COMM_PORT = sys.argv[2]
else:
print "Supply filename and serial port as parameters."
print "eg: python send.py myqueue.txt COM7"
exit()
main()
@ozgurgulsuna
Copy link

ozgurgulsuna commented Aug 8, 2018

I was just tinkering with raspberry pi and this queue sender and I thought wouldn't it be awesome if it had a function to shutdown raspi at the end of the drawing.
I added little codey thingy just after this line

  polargraph.close()
  call("sudo shutdown -h now", shell=True)

and little bit tweaking the beginning of it and added this import line

  from subprocess import call

and blam !
now it shuts the pi down after the drawing has finished.

edit: shut down also disconnects arduino from pi so basically it deactivates steppers.

@V4N63L10N
Copy link

V4N63L10N commented Jan 16, 2019

Found C:\Users\stephanie\mycommands.txt!
This file has 351 lines.
Connected successfully to COM3 (<module 'serial' from 'C:\Users\stephanie\AppData\Local\Programs\Python\Python37-32\lib\site-packages\serial\init.py'>).
received: {:s}
Traceback (most recent call last):
File "send.py", line 160, in
main()
File "send.py", line 145, in main
polargraph.read_line()
File "send.py", line 96, in read_line
print ("received: {:s}").format(l)
AttributeError: 'NoneType' object has no attribute 'format'

i keep getting this error please help i need an offline polargraph controller the polargraphSD isn't available on our country

@DaveCalaway
Copy link

Python3 edited:

#!/usr/bin/python3

"""
Only python package required is pyserial:

    pip install pyserial

Use this wee script to send a command queue to a Polargraph machine.

If the filename is "mycommandqueue.txt" and the machine is connected on COM7, then do:

    python send.py mycommandqueue.txt COM7
    
    
"""


import os
import serial
import time

import sys

BAUD_RATE = 57600


class Polargraph():
    """
    This is a crude model of a drawing machine. It includes it's drawing state
    as well as the state of the communications lines to the machine and the
    queue of commands going to it.

    """

    def __init__(self):
        self.time_started = time.time()

    serial_port = None
    file = None

    # State
    ready = False
    file_position = 0
    total_lines = 0

    def start_serial_comms(self, comm_port, baud_rate=57600):
        """
        Attempts to connect this machine to it's comm_port. It starts two
        threads, one for reading that is attached to the 'received_log'
        list, and one thread for writing that is attached to the main outgoing
        command queue.

        :param comm_port: The name of the comm port
        :param baud_rate: The speed to open the serial connection at

        :return:
        """
        try:
            self.serial_port = serial.Serial(comm_port, baudrate=baud_rate)
            print ("Connected successfully to %s (%s)." % (COMM_PORT, serial))
            return True

        except Exception as e:
            print("Oh there was an exception loading the port %s: %s" % (comm_port, e))
            print (e.message)
            self.serial_port = None
            return False

    def load_file(self, filename):
        """
        Checks that the file exists, opens it and counts the lines in it.

        :param filename:
        :return:
        """

        if os.path.isfile(filename):
            print ("Found %s!" % os.path.abspath(filename))
            self.file = open(filename, 'r')

            # Lets have a quick review of this file
            lines = 0
            for lines, l in enumerate(self.file):
                pass
            self.total_lines = lines + 1
            print ("This file has %s lines." % self.total_lines)

            # reset the file position
            self.file.seek(0)
        else:
            self.file = None
            print ("File %s couldn't be found!" % os.path.abspath(filename))

    def read_line(self):
        l = self.serial_port.readline().decode('ascii')
        l.strip()
        print("received: {:s}".format(l))
        if l.startswith("READY"):
            # if it's changing from not ready to ready, then it's just finished a command
            if not self.ready and self.file_position > 0:  # ie it's not the first one
                time_ran = time.time() - self.time_started
                time_per_command = time_ran / (self.file_position)
                time_projected = self.total_lines * time_per_command
                time_left = time_projected - time_ran

                m, s = divmod(time_left, 60)
                h, m = divmod(m, 60)
                d, h = divmod(h, 24)

                print ("Ran {:d} commands in {:0.2f} seconds: " \
                      "at {:0.2f} seconds per command, we'll finish in {:02d}:{:02d}:{:02d})".format(self.file_position,
                                                                         time_ran,
                                                                         time_per_command,
                                                                         int(h), int(m), int(s)))

            self.ready = True

    def write_line(self):
        if self.ready and self.file:
            l = self.file.readline().strip()
            self.file_position += 1
            print ("Command {}/{}: {} ({:.0%})".format(self.file_position, self.total_lines, l, (float(self.file_position) / float(self.total_lines))))
            self.serial_port.write((l + "\n").encode())
            self.ready = False

    def commands_queued(self):
        return self.total_lines - self.file_position

    def close(self):
        print ("Finished sending {:d} commands in {:0.2f}".format(self.total_lines, time.time() - self.time_started))
        self.file.close()


def main():
    # Assemble a Polargraph object, load a file, open the comms port.
    polargraph = Polargraph()
    polargraph.load_file(INPUT_FILENAME)

    opened = polargraph.start_serial_comms(comm_port=COMM_PORT)
    if not opened:
        print ("There was a problem opening the communications port. It should be entered exactly as you see it in" \
              "your operating system.")
        exit()
    while polargraph.commands_queued():
        polargraph.write_line()
        polargraph.read_line()

    polargraph.close()

if __name__ == "__main__":
    print (len(sys.argv))
    if len(sys.argv) == 3:
        INPUT_FILENAME = sys.argv[1]
        COMM_PORT = sys.argv[2]
    else:
        print ("Supply filename and serial port as parameters.")
        print ("eg: python send.py myqueue.txt COM7")
        exit()

    main()

@majabojarska
Copy link

Hi @euphy. Under what license is this script distributed? I'm thinking about redistributing a fork as a Python package via PyPI.

Thanks,
Maja

@euphy
Copy link
Author

euphy commented May 1, 2022

Hi Majabojarska, added MIT license - plenty permissive, good luck, would love to see the fork!

@majabojarska
Copy link

Thanks @euphy!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment