Skip to content

Instantly share code, notes, and snippets.

@iwalton3
Created May 8, 2022 03:27
Show Gist options
  • Save iwalton3/acc1ea43020ca9cd99543d46cbedf005 to your computer and use it in GitHub Desktop.
Save iwalton3/acc1ea43020ca9cd99543d46cbedf005 to your computer and use it in GitHub Desktop.
Window Fan Controller
# This file is executed on every boot (including wake-boot from deepsleep)
import gc
import ubinascii
def do_connect():
import network
sta_if = network.WLAN(network.STA_IF)
ap_if = network.WLAN(network.AP_IF)
if not sta_if.isconnected():
print('connecting to network...')
ap_if.active(False)
sta_if.active(True)
sta_if.connect('SSID_GOES_HERE', 'PASSWORD_GOES_HERE')
while not sta_if.isconnected():
pass
print('network config:', sta_if.ifconfig())
print('mac address:', ubinascii.hexlify(sta_if.config('mac'),':').decode())
do_connect()
gc.collect()
# Force start before main runs
import webrepl
webrepl.start()
bool value = false;
int acc = 0;
int idle_ct = 0;
int acc_ct = 0;
int last_acc = 0;
int last_mode = 8;
int last_level = 0;
bool is_on = false;
void write_data(int value) {
//digitalWrite(10, value & 1);
//digitalWrite(9, (value >> 1) & 1);
//digitalWrite(8, ((value >> 2) & 1) ^ 1);
//digitalWrite(7, ((value >> 3) & 1) ^ 1);
//digitalWrite(0, ((value >> 4) & 1) ^ 1);
digitalWrite(0, (value & 1) ^ 1);
digitalWrite(7, ((value >> 1) & 1) ^ 1);
digitalWrite(8, ((value >> 2) & 1) ^ 1);
digitalWrite(9, (value >> 3) & 1);
digitalWrite(10, ((value >> 4) & 1) ^ 1);
}
void setup() {
pinMode(10, OUTPUT);
pinMode(9, OUTPUT);
pinMode(8, OUTPUT);
pinMode(7, OUTPUT);
pinMode(0, OUTPUT);
pinMode(1, INPUT);
write_data(last_mode);
}
bool validate_level(int level) {
switch (level) {
case 0:
case 1:
case 2:
case 4:
return true;
default:
return false;
}
}
bool validate_mode(int mode) {
switch (mode) {
case 0:
case 8:
case 24:
return true;
default:
return false;
}
}
void set_fan_mode(int value) {
int mode = value & 24;
int level = value & 7;
if (!validate_level(level) || !validate_mode(mode)) {
return;
}
if (level == 0) {
write_data(last_mode);
return;
}
if (mode != last_mode && is_on) {
write_data(last_mode);
delay(3000);
}
write_data(mode);
delay(300);
last_mode = mode;
last_level = level;
is_on = true;
if (level != 4) {
write_data(mode | 4);
delay(2000);
}
write_data(mode | level);
}
void loop() {
idle_ct = 0;
while (!value) {
value = digitalRead(1);
if (idle_ct < 100000) {
idle_ct = idle_ct + 1;
}
}
delay(10);
value = digitalRead(1);
if (idle_ct > 512) {
acc = 0;
acc_ct = 0;
}
acc = (acc << 1) | value;
acc_ct = acc_ct + 1;
if (acc_ct == 5) {
if (acc == last_acc) {
set_fan_mode(acc);
}
last_acc = acc;
acc = 0;
acc_ct = 0;
}
while (value) {
value = digitalRead(1);
}
}
from machine import Pin
import pin
import time
import webserver
data_send_pin = Pin(pin.D1, Pin.OUT)
data_send_pin.off()
def mode_to_bin(mode):
if mode == "in":
return 0b11000
elif mode == "out":
return 0b00000
elif mode == "exchange":
return 0b01000
raise ValueError("Invalid mode")
levels = [0b00000, 0b00001, 0b00010, 0b00100]
last_mode = "exchange"
bin_last_mode = mode_to_bin(last_mode)
last_level = 0
is_on = False
def send_data(on):
data_send_pin.on()
time.sleep(0.015 if on else 0.005)
data_send_pin.off()
time.sleep(0.01)
def send_bits(data):
# Send data twice to ensure it is correct!
for i in range(4, -1, -1):
send_data((data >> i) & 1)
for i in range(4, -1, -1):
send_data((data >> i) & 1)
def fanctl(level=0, mode="in"):
global last_mode, is_on, bin_last_mode, last_level
# 0b54321
# ||||+--Low
# |||+---Med
# ||+----High
# |+-----In 1
# +------In 2
if level == 0:
is_on = False
send_bits(bin_last_mode)
else:
bin_mode = mode_to_bin(mode)
last_mode = mode
bin_last_mode = bin_mode
last_level = level
is_on = True
send_bits(bin_mode | levels[level])
key = "KEY_GOES_HERE"
def process_request(args):
response = {}
if "key" in args:
if args["key"] == key:
try:
do_action = False
level = last_level
mode = last_mode
if "mode" in args:
do_action = True
mode = args["mode"]
if "level" in args:
do_action = True
if args["level"] == "toggle":
if is_on:
level = 0
else:
level = last_level if last_level != 0 else 1
elif args["level"] == "last":
level = last_level if last_level != 0 else 1
else:
level = int(args["level"])
if do_action:
print("Switching to: level", level, "mode", mode)
fanctl(level, mode)
except:
response["error"] = "unexpected error"
else:
response["error"] = "wrong key"
response["mode"] = last_mode
response["level"] = last_level if is_on else 0
return response
webserver.create(process_request, wdt=600)
D0 = 16
D1 = 5
D2 = 4
D3 = 0
D4 = 2
D5 = 14
D6 = 12
D7 = 13
D8 = 15
RX = 3
TX = 1
SD2 = 9
SD3 = 10
try:
import usocket as socket
except:
import socket
import json
import time
from machine import Timer, reset
def create(process_request, wdt=None, wdt_interval=5):
wdt_ctr = -1
if wdt is not None:
def wdt_cb(_):
nonlocal wdt_ctr
if wdt_ctr != -1:
wdt_ctr += wdt_interval
if wdt_ctr > wdt:
reset()
tim = Timer(-1)
tim.init(period=wdt_interval*1000, mode=Timer.PERIODIC, callback=wdt_cb)
def handler(s):
nonlocal wdt_ctr
try:
conn, addr = s.accept()
# print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
qs = request.split(b'\r\n')[0].split(b' ')[1].split(b'?')
args = {}
if len(qs) > 1:
qs = qs[1].decode('ascii').split('&')
for pair in qs:
k, v = pair.split('=')
args[k] = v
response = json.dumps(process_request(args))
wdt_ctr = 0
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: text/html\n')
conn.send('Connection: close\n\n')
conn.sendall(response)
conn.close()
except Exception as ex:
print("Unexpected Error:", str(ex))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for _ in range(5):
try:
s.bind(('', 80))
break
except OSError as e:
print("Socket error, will retry 5 times...", str(e))
time.sleep(1)
s.listen(5)
s.setsockopt(socket.SOL_SOCKET, 20, handler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment