Last active
May 18, 2018 13:21
-
-
Save Gadgetoid/9bacc46f48454730d24a7f001613f4a8 to your computer and use it in GitHub Desktop.
PiratePython: Scrollbot Web API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import scrollphathd | |
from scrollphathd.fonts import font5x7 | |
from flask import Flask, request | |
import threading | |
import queue | |
import signal | |
import time | |
from zeroconf import ServiceInfo, Zeroconf | |
import socket | |
import struct | |
import fcntl | |
running = False | |
class ImprovedQueue(queue.Queue): | |
def to_list(self): | |
""" | |
Returns a copy of all items in the queue without removing them. | |
""" | |
with self.mutex: | |
return list(self.queue) | |
q = ImprovedQueue() | |
def get_ip_address(ifname): | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
return socket.inet_ntoa(fcntl.ioctl( | |
s.fileno(), | |
0x8915, # SIOCGIFADDR | |
struct.pack(b'256s', ifname[:15]) | |
)[20:24]) | |
except OSError: | |
return None | |
def mainloop(): | |
global running | |
scrollphathd.rotate(degrees=180) | |
scrollphathd.clear() | |
scrollphathd.show() | |
running = True | |
while running: | |
try: | |
scrollphathd.clear() | |
status = q.get(False) | |
scrollphathd.write_string(status,font=font5x7, brightness=0.1) | |
status_length = scrollphathd.write_string(status, x=0, y=0, font=font5x7, brightness=0.1) - scrollphathd.width - 1 | |
# Show string at start | |
scrollphathd.show() | |
# Pause for a second | |
time.sleep(1) | |
# Scroll through the string | |
while status_length > 0: | |
scrollphathd.scroll(1) | |
scrollphathd.show() | |
status_length -= 1 | |
time.sleep(0.02) | |
# Pause again at the end | |
time.sleep(1) | |
# Clear the display | |
scrollphathd.clear() | |
scrollphathd.show() | |
q.task_done() | |
except queue.Empty: | |
time.sleep(1) | |
zeroconf = Zeroconf() | |
app = Flask(__name__) | |
t_show = threading.Thread(target=mainloop) | |
def signal_handler(signal, frame): | |
global services, running | |
for service in services: | |
print("Unregistering:", service) | |
zeroconf.unregister_service(service) | |
zeroconf.close() | |
if running: | |
print("Stopping Queue") | |
running = False | |
t_show.join() | |
sys.exit(0) | |
@app.route('/') | |
def hello_world(): | |
return """ | |
<h1>Scrollbot Pending</h1> | |
<ul><li>{queue}</ul> | |
""".format(queue="<li>".join(q.to_list())) | |
@app.route('/message') | |
def message(): | |
text = request.args.get('text') | |
if text is not None: | |
q.put(text) | |
return 'ok' | |
else: | |
return 'no text!' | |
addr = get_ip_address(b'wlan0') | |
services = (ServiceInfo( | |
"_http._tcp.local.", | |
"PiratePython HTTP._http._tcp.local.", | |
socket.inet_aton(addr), 80, 0, 0, {}, | |
"piratepython.local."), | |
ServiceInfo( | |
"_sftp-ssh._tcp.local.", | |
"PiratePython SSH._sftp-ssh._tcp.local.", | |
socket.inet_aton(addr), 22, 0, 0, {}, | |
"piratepython.local.")) | |
for service in services: | |
zeroconf.register_service(service) | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
t_show.start() | |
app.run(host='0.0.0.0', port='80', debug=False, use_reloader=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This example is for MicroDot pHAT - up to 20171106x you'll need to add the MicroDot pHAT library folder into /libraries | |
""" | |
import microdotphat | |
from flask import Flask, request | |
import threading | |
import queue | |
import signal | |
import time | |
from zeroconf import ServiceInfo, Zeroconf | |
import socket | |
import struct | |
import fcntl | |
running = False | |
class ImprovedQueue(queue.Queue): | |
def to_list(self): | |
""" | |
Returns a copy of all items in the queue without removing them. | |
""" | |
with self.mutex: | |
return list(self.queue) | |
q = ImprovedQueue() | |
def get_ip_address(ifname): | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
return socket.inet_ntoa(fcntl.ioctl( | |
s.fileno(), | |
0x8915, # SIOCGIFADDR | |
struct.pack(b'256s', ifname[:15]) | |
)[20:24]) | |
except OSError: | |
return None | |
def mainloop(): | |
global running | |
microdotphat.clear() | |
microdotphat.show() | |
running = True | |
while running: | |
try: | |
microdotphat.clear() | |
status = q.get(False) | |
microdotphat.write_string(status) | |
status_length = microdotphat.write_string(status) - 46 | |
# Show string at start | |
microdotphat.show() | |
# Pause for a second | |
time.sleep(1) | |
# Scroll through the string | |
while status_length > 0: | |
microdotphat.scroll(1) | |
microdotphat.show() | |
status_length -= 1 | |
time.sleep(0.02) | |
# Pause again at the end | |
time.sleep(1) | |
# Clear the display | |
microdotphat.clear() | |
microdotphat.show() | |
q.task_done() | |
except queue.Empty: | |
time.sleep(1) | |
zeroconf = Zeroconf() | |
app = Flask(__name__) | |
t_show = threading.Thread(target=mainloop) | |
def signal_handler(signal, frame): | |
global services, running | |
for service in services: | |
print("Unregistering:", service) | |
zeroconf.unregister_service(service) | |
zeroconf.close() | |
if running: | |
print("Stopping Queue") | |
running = False | |
t_show.join() | |
sys.exit(0) | |
@app.route('/') | |
def hello_world(): | |
return """ | |
<h1>Scrollbot Pending</h1> | |
<ul><li>{queue}</ul> | |
""".format(queue="<li>".join(q.to_list())) | |
@app.route('/message') | |
def message(): | |
text = request.args.get('text') | |
if text is not None: | |
q.put(text) | |
return 'ok' | |
else: | |
return 'no text!' | |
addr = get_ip_address(b'wlan0') | |
services = (ServiceInfo( | |
"_http._tcp.local.", | |
"PiratePython HTTP._http._tcp.local.", | |
socket.inet_aton(addr), 80, 0, 0, {}, | |
"piratepython.local."), | |
ServiceInfo( | |
"_sftp-ssh._tcp.local.", | |
"PiratePython SSH._sftp-ssh._tcp.local.", | |
socket.inet_aton(addr), 22, 0, 0, {}, | |
"piratepython.local.")) | |
for service in services: | |
zeroconf.register_service(service) | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
t_show.start() | |
app.run(host='0.0.0.0', port='80', debug=False, use_reloader=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment