Last active
January 9, 2018 21:23
-
-
Save SilverCory/92162e36e988b8c2db3a50c570ba273e to your computer and use it in GitHub Desktop.
Statistics displaying for spacepools on the raspberry pi.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Created for a Raspberry PI and I2C display. | |
# Display: https://www.banggood.com/0_96-Inch-4Pin-White-IIC-I2C-OLED-Display-Module-12864-LED-For-Arduino-p-958196.html | |
import time | |
import Adafruit_GPIO.SPI as SPI | |
import Adafruit_SSD1306 | |
from PIL import Image | |
from PIL import ImageDraw | |
from PIL import ImageFont | |
import urllib2 | |
import schedule | |
import subprocess | |
import threading | |
import json | |
import sys | |
RST = None | |
DC = 23 | |
SPI_PORT = 0 | |
SPI_DEVICE = 0 | |
# 128x32 display with hardware I2C: | |
disp = Adafruit_SSD1306.SSD1306_128_32(rst=RST) | |
# 128x64 display with hardware I2C: | |
# disp = Adafruit_SSD1306.SSD1306_128_64(rst=RST) | |
statsPage = "https://api.etn.spacepools.org/v1/stats/workerStats/etnk8nGKZxFY2Fx5QPmYg6GkvvMaT4bZi1myk2Ekyk1J6LSSReFQMifKrgdm2D5cGvCthw1HmgbUcNs4pWGeSsF52VvbzgYB7B" | |
stats = "{}" | |
statsDecoded = [] | |
erroredTimes = 0 | |
def run_threaded(job_func): | |
job_thread = threading.Thread(target=job_func) | |
job_thread.daemon = True | |
job_thread.start() | |
def timeTick(): | |
global stats | |
global statsDecoded | |
global erroredTimes | |
try: | |
stats = urllib2.urlopen(statsPage).read() | |
statsDecoded = json.loads(stats) | |
if len(statsDecoded) < 1: | |
erroredTimes += 1 | |
else: | |
erroredTimes = 0 | |
except Exception as e: | |
erroredTimes += 1 | |
print("Error loading stats.") | |
print(e) | |
if erroredTimes >= 4: | |
stats = "{}" | |
statsDecoded = [] | |
def runScheduler(): | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |
disp.begin() | |
disp.clear() | |
disp.display() | |
width = disp.width | |
height = disp.height | |
image = Image.new('1', (width, height)) | |
draw = ImageDraw.Draw(image) | |
draw.rectangle((0,0,width,height), outline=0, fill=0) | |
padding = -2 | |
top = padding | |
bottom = height-padding | |
x = 0 | |
fontLarge = ImageFont.truetype('monofonto.ttf', 16) | |
schedule.every(8).seconds.do(run_threaded, timeTick) | |
run_threaded(runScheduler) | |
def clearScreen(): | |
draw.rectangle((0,0,width,height), outline=0, fill=0) | |
def display(): | |
disp.image(image) | |
disp.display() | |
while True: | |
try: | |
time.sleep(.1) | |
if len(statsDecoded) < 1: | |
clearScreen() | |
draw.text((x, top), "404", font=fontLarge, fill=255) | |
draw.text((x, top+16), "None loaded?", font=fontLarge, fill=255) | |
display() | |
else: | |
decoded = statsDecoded | |
for entry in decoded["workers"]: | |
clearScreen() | |
draw.text((x, top), entry["rigId"], font=fontLarge, fill=255) | |
draw.text((x, top+16), entry["hashRate"], font=fontLarge, fill=255) | |
display() | |
time.sleep(3) | |
except KeyboardInterrupt: | |
print "Bye" | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment