Skip to content

Instantly share code, notes, and snippets.

@cleverdevil
Last active August 4, 2017 23:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cleverdevil/1cf7b670c4cf9fcf5679758e167f1180 to your computer and use it in GitHub Desktop.
Save cleverdevil/1cf7b670c4cf9fcf5679758e167f1180 to your computer and use it in GitHub Desktop.
Scripts for updating 🐷πŸ”₯.ws
import stoker
import micropub
import time
MICROPUB_ENDPOINT = 'https://xn--cp8h2o.ws/micropub/endpoint'
INDIEAUTH_TOKEN = 'REDACTED'
SYNDICATE_TO = 'twitter::JonsHotMeat'
s = stoker.Stoker('bbq.lacour.local', timeout=120)
d = u"\N{DEGREE SIGN}"
tmpl = "Cooker temperature: %0.2f" + d + "F. "
tmpl += "Meat temperature: %0.2f" + d + "F (target %0.2f" + d + "F)."
run = True
def main():
while run:
content = tmpl % (
s.sensors.Cooker.tc,
s.sensors.Meat.tc,
s.sensors.Meat.ta
)
print content
micropub.post(
content,
INDIEAUTH_TOKEN,
MICROPUB_ENDPOINT,
SYNDICATE_TO
)
time.sleep(300)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
run = False
s.quit()
import requests
def post(content, token, endpoint, syndicate):
# build out data
data = {}
data['content'] = content
# populate remaining details
data['access_token'] = token
data['mp-syndicate-to'] = syndicate
data['h'] = 'entry'
# create the post
result = requests.post(endpoint, data=data)
# check the result
if result.status_code not in (200, 201):
print 'Failed to publish post with status code: %d' % result.status_code
else:
print 'Published successfully.'
import json
import time
import requests
import attrdict
import threading
class Stoker(object):
def __init__(self, ip_address, timeout=60):
self._ip_address = ip_address
self._timeout = timeout
self._data = None
self._lock = threading.Lock()
self._quit = threading.Event()
self._update()
def quit(self):
self._quit.set()
def _update(self):
print "Updating..."
data = requests.get(
'http://%s/stoker.json' % self._ip_address
).json()
with self._lock:
self.sensors = attrdict.AttrDict()
self.blowers = attrdict.AttrDict()
for sensor in data['stoker']['sensors']:
self.sensors[sensor['name']] = attrdict.AttrDict(sensor)
for blower in data['stoker']['blowers']:
self.blowers[blower['name']] = attrdict.AttrDict(blower)
def schedule_update():
if self._quit.wait(self._timeout):
print 'Quitting...'
else:
self._update()
threading.Thread(target=schedule_update).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment