Skip to content

Instantly share code, notes, and snippets.

@Gadgetoid
Last active May 18, 2018 13:21
Show Gist options
  • Save Gadgetoid/9bacc46f48454730d24a7f001613f4a8 to your computer and use it in GitHub Desktop.
Save Gadgetoid/9bacc46f48454730d24a7f001613f4a8 to your computer and use it in GitHub Desktop.
PiratePython: Scrollbot Web API
#!/usr/bin/env python
import scrollphathd
from scrollphathd.fonts import font5x7
from flask import Flask, request
import threading
import queue
import signal
import time
from zeroconf import ServiceInfo, Zeroconf
import socket
import struct
import fcntl
running = False
class ImprovedQueue(queue.Queue):
def to_list(self):
"""
Returns a copy of all items in the queue without removing them.
"""
with self.mutex:
return list(self.queue)
q = ImprovedQueue()
def get_ip_address(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', ifname[:15])
)[20:24])
except OSError:
return None
def mainloop():
global running
scrollphathd.rotate(degrees=180)
scrollphathd.clear()
scrollphathd.show()
running = True
while running:
try:
scrollphathd.clear()
status = q.get(False)
scrollphathd.write_string(status,font=font5x7, brightness=0.1)
status_length = scrollphathd.write_string(status, x=0, y=0, font=font5x7, brightness=0.1) - scrollphathd.width - 1
# Show string at start
scrollphathd.show()
# Pause for a second
time.sleep(1)
# Scroll through the string
while status_length > 0:
scrollphathd.scroll(1)
scrollphathd.show()
status_length -= 1
time.sleep(0.02)
# Pause again at the end
time.sleep(1)
# Clear the display
scrollphathd.clear()
scrollphathd.show()
q.task_done()
except queue.Empty:
time.sleep(1)
zeroconf = Zeroconf()
app = Flask(__name__)
t_show = threading.Thread(target=mainloop)
def signal_handler(signal, frame):
global services, running
for service in services:
print("Unregistering:", service)
zeroconf.unregister_service(service)
zeroconf.close()
if running:
print("Stopping Queue")
running = False
t_show.join()
sys.exit(0)
@app.route('/')
def hello_world():
return """
<h1>Scrollbot Pending</h1>
<ul><li>{queue}</ul>
""".format(queue="<li>".join(q.to_list()))
@app.route('/message')
def message():
text = request.args.get('text')
if text is not None:
q.put(text)
return 'ok'
else:
return 'no text!'
addr = get_ip_address(b'wlan0')
services = (ServiceInfo(
"_http._tcp.local.",
"PiratePython HTTP._http._tcp.local.",
socket.inet_aton(addr), 80, 0, 0, {},
"piratepython.local."),
ServiceInfo(
"_sftp-ssh._tcp.local.",
"PiratePython SSH._sftp-ssh._tcp.local.",
socket.inet_aton(addr), 22, 0, 0, {},
"piratepython.local."))
for service in services:
zeroconf.register_service(service)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
t_show.start()
app.run(host='0.0.0.0', port='80', debug=False, use_reloader=False)
#!/usr/bin/env python
"""
This example is for MicroDot pHAT - up to 20171106x you'll need to add the MicroDot pHAT library folder into /libraries
"""
import microdotphat
from flask import Flask, request
import threading
import queue
import signal
import time
from zeroconf import ServiceInfo, Zeroconf
import socket
import struct
import fcntl
running = False
class ImprovedQueue(queue.Queue):
def to_list(self):
"""
Returns a copy of all items in the queue without removing them.
"""
with self.mutex:
return list(self.queue)
q = ImprovedQueue()
def get_ip_address(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', ifname[:15])
)[20:24])
except OSError:
return None
def mainloop():
global running
microdotphat.clear()
microdotphat.show()
running = True
while running:
try:
microdotphat.clear()
status = q.get(False)
microdotphat.write_string(status)
status_length = microdotphat.write_string(status) - 46
# Show string at start
microdotphat.show()
# Pause for a second
time.sleep(1)
# Scroll through the string
while status_length > 0:
microdotphat.scroll(1)
microdotphat.show()
status_length -= 1
time.sleep(0.02)
# Pause again at the end
time.sleep(1)
# Clear the display
microdotphat.clear()
microdotphat.show()
q.task_done()
except queue.Empty:
time.sleep(1)
zeroconf = Zeroconf()
app = Flask(__name__)
t_show = threading.Thread(target=mainloop)
def signal_handler(signal, frame):
global services, running
for service in services:
print("Unregistering:", service)
zeroconf.unregister_service(service)
zeroconf.close()
if running:
print("Stopping Queue")
running = False
t_show.join()
sys.exit(0)
@app.route('/')
def hello_world():
return """
<h1>Scrollbot Pending</h1>
<ul><li>{queue}</ul>
""".format(queue="<li>".join(q.to_list()))
@app.route('/message')
def message():
text = request.args.get('text')
if text is not None:
q.put(text)
return 'ok'
else:
return 'no text!'
addr = get_ip_address(b'wlan0')
services = (ServiceInfo(
"_http._tcp.local.",
"PiratePython HTTP._http._tcp.local.",
socket.inet_aton(addr), 80, 0, 0, {},
"piratepython.local."),
ServiceInfo(
"_sftp-ssh._tcp.local.",
"PiratePython SSH._sftp-ssh._tcp.local.",
socket.inet_aton(addr), 22, 0, 0, {},
"piratepython.local."))
for service in services:
zeroconf.register_service(service)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
t_show.start()
app.run(host='0.0.0.0', port='80', debug=False, use_reloader=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment