Skip to content

Instantly share code, notes, and snippets.

@lachesis
Created June 13, 2023 23:06
Show Gist options
  • Save lachesis/11d5d541d4a1fff2e68f3a185b00b533 to your computer and use it in GitHub Desktop.
Save lachesis/11d5d541d4a1fff2e68f3a185b00b533 to your computer and use it in GitHub Desktop.
Flipper IR explorations
#!/usr/bin/env python3
# Uses flipper as an IR blaster to drive my Bionaire window fan into the desired state
# Also uses my Kasa plug to reset the fan into a known starting state
# deps: pip install pyflipper
# the remote protocol does on-off keying (i.e. ASK) of an IR carrier
# the carrier is a 38 KHz square wave with a 33% duty cycle
# (the fan still seems to work with a 50% duty cycle as well)
# the signal has a base period time T that I have rounded to 400ms
# the following symbols are used:
# SHORT - 1T long
# LONG - 3T long
# BLANK - 15T long
# bits are represented by pairs of (On, Off) timings
# (LONG On, SHORT Off) -> 0
# (SHORT On, LONG Off) -> 1
# (arbitrarily assigned)
# messages are 12 bits long and must be repeated at least 2x to be acted upon
# the Off period of the last bit must be at least 15T (aka BLANK) µs
# this is probably all wrong
# don't expect to use this as is for any other device
# it has lots of janky random hard coded things
# ...but it seems to work
import socket
import time
import sys
import pyflipper.pyflipper
KASA_PLUG_IP = 'kasa-plug3'
FLIPPER_COMM_PORT = '/dev/ttyACM0'
PATH_TO_SAVED_REMOTE = '/ext/infrared/Winfan.ir' # file on the flipper containing the captures
# length in us of short samples vs long samples vs end of msg blank samples
SHORT = 400
LONG = 3 * SHORT # 1200
BLANK = 15 * SHORT # 6000
# length in bits of a message; bold variable name
LEN = 12
# data needed for modelling the state of the Fan
STATE_SET = (
(['OFF', 'LOW', 'MED', 'HIGH'], 'Pwr'),
(['AUTO', 'ON'], 'Mode'),
(['IN', 'OUT', 'BOTH'], 'Airflow'),
)
START_STATE = ('OFF', 'AUTO', 'IN')
flipper = pyflipper.pyflipper.PyFlipper(com=FLIPPER_COMM_PORT)
# IR reverse engineering
def parse_samples(raw_samples):
return [int(x) for x in raw_samples.split(' ')]
def flatten(l):
return [item for sublist in l for item in sublist]
def split_pairs(samples):
return [(samples[i], samples[i+1]) for i in range(0, len(samples)//2*2, 2)]
def bits2num(bits):
return int(''.join(str(s) for s in bits), 2)
def num2bits(num):
return [int(x) for x in bin(num)[2:].rjust(LEN, '0')]
# stop after first BLANK period (aka get minimum message)
def minimize(samples):
# threshold the samples into the symbols SHORT LONG or BLANK
samples = [SHORT if x < SHORT*1.5 else LONG if x < LONG*1.5 else BLANK for x in samples]
for i, v in enumerate(samples):
if i % 2 == 0: continue
if v > LONG*1.5: # must be BLANK
i += 1
break
else:
raise ValueError("Could not find long step")
return samples[:i]
BIT_MAP = {
# pairs of (on-time, off-time)
(LONG, SHORT) : 0,
(SHORT, LONG) : 1,
(SHORT, SHORT) : 'X', # invalid messages
(LONG, LONG) : 'X',
(LONG, BLANK) : 0, # special case the extra long blank at end
(SHORT, BLANK) : 1,
}
def decode(samples):
if isinstance(samples[0], str):
samples = parse_samples(samples)
samples = minimize(samples)
pairs = [(samples[i], samples[i+1]) for i in range(0, len(samples)//2*2, 2)]
bits = [BIT_MAP[p] for p in pairs]
return bits
def encode(bitlist):
pairs = [(SHORT, LONG) if b else (LONG, SHORT) for b in bitlist]
samples = flatten(pairs)
# adding BLANK to existing off would probably be better
# but then the rounded samples from
samples[-1] = BLANK # always end with a blank
return samples
# flipper IR blaster and storage stuff
def read_winfan():
txt = flipper.storage.read(PATH_TO_SAVED_REMOTE)
samps = {}
name = None
data = None
# get the name of the button and its data
for line in txt.split('\n'):
if line.startswith('name:'):
name = line.split(': ')[1]
elif line.startswith('data:'):
data = line.split(': ', 1)[1]
samps[name] = data
return samps
def send(bits):
if isinstance(bits, int):
bits = num2bits(bits)
if isinstance(bits, list):
if len(bits) == LEN:
samples = encode(bits)
samples += samples
elif len(bits) > LEN:
samples = bits
else:
raise ValueError("HUH?")
else:
raise ValueError("HUH?")
# hard coded these based on reading the flipper remote file
flipper.ir.tx_raw(frequency=38000, duty_cycle=0.33, samples=samples)
def devel_main():
# dumb test to make sure bit packing code works
for i in range(2**12):
bits = num2bits(i)
assert len(bits) == 12
assert i == bits2num(bits)
# read file from flipper
samps = read_winfan()
# could also read a remote button press directly with
# print("press remote button now")
# txt = flipper.ir.rx(timeout=5)
# todo: some code that parses the returned serial text
# decode each command & print its bits & number
cmds = {}
for name, raw_samples in samps.items():
samples = parse_samples(raw_samples)
samples = minimize(samples)
bits = decode(samples)
num = bits2num(bits)
cmds[name] = num
print(name.ljust(10, ' '), bits, hex(num))
assert encode(bits) == samples
print(repr(cmds))
# kasa switch stuff
def kasa_scramble(plaintext):
n = len(plaintext)
payload = []
key = 0xAB
for i in range(n):
key = plaintext[i] ^ key
payload.append(key)
return bytes(payload)
def kasa_send(hostname, cmd):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
data = kasa_scramble(cmd)
s.sendto(data, (hostname, 9999))
def kasa_switch(hostname, state):
return kasa_send( hostname, b'{"system":{"set_relay_state":{"state":%d}}}' % (bool(state),) )
# fan state management stuff
def validate_state(state):
if len(state) != len(STATE_SET):
return False
for (state_list, cmd), cur_val in zip(STATE_SET, state):
if cur_val not in state_list:
return False
return True
def steps_needed(state_list, cur_val, new_val):
cur_idx = state_list.index(cur_val)
new_idx = state_list.index(new_val)
if new_idx == cur_idx:
return 0
elif new_idx > cur_idx:
return new_idx - cur_idx
else:
return len(state_list) + new_idx - cur_idx
def compute_todo(cur_state, new_state):
cmds = []
for (state_list, cmd), cur_val, new_val in zip(STATE_SET, cur_state, new_state):
steps = steps_needed(state_list, cur_val, new_val)
cmds.extend( [cmd] * steps )
return cmds
def main():
# generated by devel_main()
commands = {
'Pwr' : 0x239,
'Tup' : 0x23c,
'Tdwn' : 0x25f,
'Mode' : 0x26f,
'Airflow': 0x277,
}
# desired default state for the fan with no inputs
new_state = ('HIGH', 'ON', 'IN')
if len(sys.argv) == 4: # literal state
new_state = [x.upper() for x in sys.argv[1:4]]
if not validate_state(new_state):
raise ValueError("Invalid requested state: %r" % new_state)
if len(sys.argv) == 2: # literal cmd
cmd = sys.argv[1]
send(commands[cmd])
return
state = START_STATE
todo = compute_todo(state, new_state)
print("Going to state:", new_state)
print(todo)
# power cycle to get into known state
kasa_switch(KASA_PLUG_IP, False)
time.sleep(0.7)
kasa_switch(KASA_PLUG_IP, True)
time.sleep(0.7)
# send commands to get into desired state
for cmd in todo:
send(commands[cmd])
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment