Skip to content

Instantly share code, notes, and snippets.

@SilverCory
Last active January 9, 2018 21:23
Show Gist options
  • Save SilverCory/92162e36e988b8c2db3a50c570ba273e to your computer and use it in GitHub Desktop.
Save SilverCory/92162e36e988b8c2db3a50c570ba273e to your computer and use it in GitHub Desktop.
Statistics displaying for spacepools on the raspberry pi.
#!/usr/bin/python
# Created for a Raspberry PI and I2C display.
# Display: https://www.banggood.com/0_96-Inch-4Pin-White-IIC-I2C-OLED-Display-Module-12864-LED-For-Arduino-p-958196.html
import time
import Adafruit_GPIO.SPI as SPI
import Adafruit_SSD1306
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
import urllib2
import schedule
import subprocess
import threading
import json
import sys
RST = None
DC = 23
SPI_PORT = 0
SPI_DEVICE = 0
# 128x32 display with hardware I2C:
disp = Adafruit_SSD1306.SSD1306_128_32(rst=RST)
# 128x64 display with hardware I2C:
# disp = Adafruit_SSD1306.SSD1306_128_64(rst=RST)
statsPage = "https://api.etn.spacepools.org/v1/stats/workerStats/etnk8nGKZxFY2Fx5QPmYg6GkvvMaT4bZi1myk2Ekyk1J6LSSReFQMifKrgdm2D5cGvCthw1HmgbUcNs4pWGeSsF52VvbzgYB7B"
stats = "{}"
statsDecoded = []
erroredTimes = 0
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.daemon = True
job_thread.start()
def timeTick():
global stats
global statsDecoded
global erroredTimes
try:
stats = urllib2.urlopen(statsPage).read()
statsDecoded = json.loads(stats)
if len(statsDecoded) < 1:
erroredTimes += 1
else:
erroredTimes = 0
except Exception as e:
erroredTimes += 1
print("Error loading stats.")
print(e)
if erroredTimes >= 4:
stats = "{}"
statsDecoded = []
def runScheduler():
while True:
schedule.run_pending()
time.sleep(1)
disp.begin()
disp.clear()
disp.display()
width = disp.width
height = disp.height
image = Image.new('1', (width, height))
draw = ImageDraw.Draw(image)
draw.rectangle((0,0,width,height), outline=0, fill=0)
padding = -2
top = padding
bottom = height-padding
x = 0
fontLarge = ImageFont.truetype('monofonto.ttf', 16)
schedule.every(8).seconds.do(run_threaded, timeTick)
run_threaded(runScheduler)
def clearScreen():
draw.rectangle((0,0,width,height), outline=0, fill=0)
def display():
disp.image(image)
disp.display()
while True:
try:
time.sleep(.1)
if len(statsDecoded) < 1:
clearScreen()
draw.text((x, top), "404", font=fontLarge, fill=255)
draw.text((x, top+16), "None loaded?", font=fontLarge, fill=255)
display()
else:
decoded = statsDecoded
for entry in decoded["workers"]:
clearScreen()
draw.text((x, top), entry["rigId"], font=fontLarge, fill=255)
draw.text((x, top+16), entry["hashRate"], font=fontLarge, fill=255)
display()
time.sleep(3)
except KeyboardInterrupt:
print "Bye"
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment