Skip to content

Instantly share code, notes, and snippets.

@keyz182
Last active April 20, 2020 13:03
Show Gist options
  • Save keyz182/2dd561745c41063fa4c0d4e039e09039 to your computer and use it in GitHub Desktop.
Save keyz182/2dd561745c41063fa4c0d4e039e09039 to your computer and use it in GitHub Desktop.
3D Print notification - SMS + image url
"""
notifier.py
Listens to a Duet over USB
Configure your slicer to add M118 P1 S"###END###" to the end of your GCode
Ensure "port" points to the Duet serial port
Configure the following parameters:
- twilio_account_sid - from https://www.twilio.com/console/project/settings
- twilio_auth_token - from https://www.twilio.com/console/project/settings
- s3_bucket - your S3 bucket name
- s3_url - Add in the region instead of ####
- from_number - the number twilio gives you to send from
- to_number - your number
- mjpeg_streamer_url - URL to get a snapshot from your camera (tested with mjpeg_streamer https://github.com/jacksonliam/mjpg-streamer)
For AWS Auth follow this guide - https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration
Run `pip install twilio boto3 systemd requests`
Notes:
Currently a bit overly chatty, and filled up the root partition on my Pi with logs when Duet was disconnected.
For now, added logrotate `size 512k` and set it to rotate every 5 minutes.
TODO: specifically handle errors for serial port not found and in use, add time.sleep, and don't log the stack trace to clean up logs
"""
import serial, time, json, io, os, requests, datetime, sys, traceback, logging
from systemd import journal
from twilio.rest import Client
import boto3
log = logging.getLogger('timelapse')
log.addHandler(journal.JournaldLogHandler())
log.setLevel(logging.INFO)
port = '/dev/serial/by-id/usb-Duet3D_Duet-if00'
baud = 115200
twilio_account_sid = "####"
twilio_auth_token = "####"
s3_bucket = "####"
s3_url = "https://{}.s3.####.amazonaws.com".format(s3_bucket)
mjpeg_streamer_url = "http://127.0.0.1:8080/?action=snapshot"
from_number = '####'
to_number = '####'
ser = serial.Serial(port, baud)
s3 = boto3.resource('s3')
def log_print(*msg, file=sys.stdout):
dt = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
print(dt, *msg, file=file)
out = '{} {}'.format(dt, ' '.join([str(m) for m in msg]))
log.info(out)
def err_print(*msg, file=sys.stdout):
dt = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
print(dt, *msg, file=file)
out = '{} {}'.format(dt, ' '.join([str(m) for m in msg]))
log.exception(out)
log_print(ser.name)
twilio_client = Client(twilio_account_sid, twilio_auth_token)
while True:
try:
tdata = ser.read() # Wait forever for anything
time.sleep(1) # Sleep (or inWaiting() doesn't give the correct value)
data_left = ser.inWaiting() # Get the number of characters ready to be read
tdata += ser.read(data_left) # Do the read and combine it with the first character
log_print(tdata)
if '###END###' in tdata.decode():
try:
# TODO: Look at M408 or similar to get more info?
r = requests.get(mjpeg_streamer_url, timeout=5)
r.raise_for_status()
filename = "{}.jpg".format(datetime.datetime.now().strftime("%Y-%m-%d"))
s3.Bucket(s3_bucket).put_object(Key=filename,Body=r.content, ACL='public-read', ContentType='image/jpeg')
url = "{}/{}".format(s3_url, filename)
message = twilio_client.messages.create(
body="Print Finished!",
media_url=[url],
from_=from_number,
to=to_number
)
log_print("Sent MMS {}".format(message.sid))
except Exception as e:
err_print("Error sending whatsapp with image!")
message = twilio_client.messages.create(to=to_number,from_=from_number,body="Print Finished!")
log_print("Sent SMS {}".format(message.sid))
except serial.serialutil.SerialException as e:
try:
ser = serial.Serial(port, baud)
print(ser.name)
except serial.serialutil.SerialException as ex:
err_print('Somethings buggered up serially yo')
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
traceback.print_exception(exc_type, exc_value, exc_traceback,limit=2, file=sys.stdout)
time.sleep(10)
except Exception as e:
err_print('Somethings buggered up yo')
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
traceback.print_exception(exc_type, exc_value, exc_traceback,limit=2, file=sys.stdout)
time.sleep(10)
[Unit]
Description=Print Notifier
DefaultDependencies=no
After=network.target
[Service]
Type=simple
User=pi
Group=pi
ExecStart=/usr/bin/python3 /home/pi/notifier.py
TimeoutStartSec=0
RemainAfterExit=yes
[Install]
WantedBy=default.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment