Last active
April 29, 2018 09:46
-
-
Save grocid/05f26772c29ffc3341cff14386539352 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hmac | |
import time | |
import requests | |
SHARED_KEY = "hemligt" | |
BASE_URL = "https://myserver.com:8443/?duration={0}&token={1}" | |
def compute_auth(): | |
auth = hmac.new( | |
SHARED_KEY, | |
str(int(time.time())) | |
) | |
return auth.hexdigest() | |
def run(duration): | |
token = compute_auth() | |
duration = BASE_URL.format(duration, token) | |
r = requests.get(duration, verify=False) | |
# run for 9 seconds | |
run(9) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import requests | |
def get_temperature(): | |
data = requests.get("https://wttr.in/karlskrona", verify=False).text | |
# sort of messy data with escape chars... not suitable for regexp. | |
start = "\x1b[0m \x1b[38;5;" | |
end = "\x1b" | |
temp = data.split(start)[1].split(end)[0].split("\x1b[0m") | |
return temp[0].split("m")[1] | |
def get_duration(): | |
try: | |
temp = get_temperature() | |
except: | |
temp = 14 | |
# define function... lets do this from empirical observations. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ssl | |
import time | |
import RPi.GPIO as GPIO | |
import BaseHTTPServer, SimpleHTTPServer | |
import urlparse | |
import hmac | |
PORT = 18 # PIN on Raspberry Pi | |
SHARED_KEY = "hemligt" | |
def pump(duration): | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setwarnings(False) | |
GPIO.setup(PORT, GPIO.OUT) | |
GPIO.output(PORT, GPIO.HIGH) | |
time.sleep(duration) | |
GPIO.output(PORT, GPIO.LOW) | |
class IrrigationHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
def compute_auth(self, ctime): | |
auth = hmac.new(SHARED_KEY, str(ctime)) | |
return auth.hexdigest() | |
def authenticate(self, token): | |
ctime = int(time.time()) | |
for i in range(-10, 10): | |
if self.compute_auth(ctime + i) == token: | |
return True | |
return False | |
def do_GET(self): | |
o = urlparse.urlparse(self.path) | |
data = urlparse.parse_qs(o.query) | |
if not self.authenticate(data.get("token")[0]): # yep, will stack trace if not given | |
self.send_response(503) | |
self.end_headers() | |
return | |
self.send_response(200) | |
self.send_header("Content-type", "text/html") | |
self.end_headers() | |
try: | |
duration = data.get("duration")[0] | |
pump(int(duration)) | |
except: | |
pump(9) | |
httpd = BaseHTTPServer.HTTPServer( | |
('0.0.0.0', 8443), IrrigationHandler | |
) | |
httpd.socket = ssl.wrap_socket( | |
httpd.socket, certfile='./cert.pem', server_side=True | |
) | |
httpd.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment