Skip to content

Instantly share code, notes, and snippets.

@mikolasan
Last active July 17, 2017 06:01
Show Gist options
  • Save mikolasan/248ca74adeecdf9a58b0077289cb7297 to your computer and use it in GitHub Desktop.
Save mikolasan/248ca74adeecdf9a58b0077289cb7297 to your computer and use it in GitHub Desktop.
Relay control
import curses
from curses import wrapper
import serial
import sys
channels = ['1', '2', '3', '4', '5', '6', '7', '8']
def calc_crc(data):
return sum(data)
def enable_relay(id, state):
state = state | (1 << (id - 1))
return state
def disable_relay(id, state):
state = state & (~(1 << (id - 1)))
return state
def init_protocol():
# 1 (0x55) - head
# 2 (0x01) - target address
# 3 (0x13) - function code (data type)
# 4 (0x00) - data 1
# 5 (0x00) - data 2
# 6 (0x00) - data 3
# 7 (0x??) - data 4
# 8 (0x00) - checksum (sum 1-7)
data_frame = [0x55, 0x01, 0x13, 0x00, 0x00, 0x00]
global command_base = bytearray(data_frame)
global command_base_crc = calc_crc(command_base)
global answer_length = 8
global answer_head = 0x22
global answer_address = 0x01
global answer_data_type = 0x00
def init():
port = len(sys.argv) > 1 and sys.argv[1] or "/dev/ttyUSB0"
global ser = serial.Serial(port, timeout = 2)
init_protocol()
global state = 0x00
global stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
stdscr.keypad(True)
# clear screen
stdscr.clear()
def gen_command(state):
command = bytearray(command_base)
command.append(state)
# checksum
command.append(calc_crc(command))
return command
def parse_answer(bytes):
if (bytes[0] == answer_head and bytes[1] == answer_address and
bytes[2] == answer_data_type):
if state != bytes[6]:
state = bytes[6]
print("Update state", state)
else:
print("Check state [OK]")
else:
print("check state [FAIL]", bytes)
def change_state(relay_state):
ser.write(gen_command(relay_state))
answer = ser.read(answer_length)
parse_answer(answer)
def loop():
number = 1
while True:
c = stdscr.getkey()
if c in channels:
number = int(c)
elif c == curses.KEY_DOWN:
change_state(enable_relay(number, state))
elif c == curses.KEY_UP:
change_state(disable_relay(number, state))
elif c == 'q':
break
def finish():
curses.nocbreak()
stdscr.keypad(False)
curses.echo()
curses.endwin()
ser.close()
init()
wrapper(loop())
finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment