Last active
July 17, 2017 06:01
-
-
Save mikolasan/248ca74adeecdf9a58b0077289cb7297 to your computer and use it in GitHub Desktop.
Relay control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import curses | |
from curses import wrapper | |
import serial | |
import sys | |
channels = ['1', '2', '3', '4', '5', '6', '7', '8'] | |
def calc_crc(data): | |
return sum(data) | |
def enable_relay(id, state): | |
state = state | (1 << (id - 1)) | |
return state | |
def disable_relay(id, state): | |
state = state & (~(1 << (id - 1))) | |
return state | |
def init_protocol(): | |
# 1 (0x55) - head | |
# 2 (0x01) - target address | |
# 3 (0x13) - function code (data type) | |
# 4 (0x00) - data 1 | |
# 5 (0x00) - data 2 | |
# 6 (0x00) - data 3 | |
# 7 (0x??) - data 4 | |
# 8 (0x00) - checksum (sum 1-7) | |
data_frame = [0x55, 0x01, 0x13, 0x00, 0x00, 0x00] | |
global command_base = bytearray(data_frame) | |
global command_base_crc = calc_crc(command_base) | |
global answer_length = 8 | |
global answer_head = 0x22 | |
global answer_address = 0x01 | |
global answer_data_type = 0x00 | |
def init(): | |
port = len(sys.argv) > 1 and sys.argv[1] or "/dev/ttyUSB0" | |
global ser = serial.Serial(port, timeout = 2) | |
init_protocol() | |
global state = 0x00 | |
global stdscr = curses.initscr() | |
curses.noecho() | |
curses.cbreak() | |
stdscr.keypad(True) | |
# clear screen | |
stdscr.clear() | |
def gen_command(state): | |
command = bytearray(command_base) | |
command.append(state) | |
# checksum | |
command.append(calc_crc(command)) | |
return command | |
def parse_answer(bytes): | |
if (bytes[0] == answer_head and bytes[1] == answer_address and | |
bytes[2] == answer_data_type): | |
if state != bytes[6]: | |
state = bytes[6] | |
print("Update state", state) | |
else: | |
print("Check state [OK]") | |
else: | |
print("check state [FAIL]", bytes) | |
def change_state(relay_state): | |
ser.write(gen_command(relay_state)) | |
answer = ser.read(answer_length) | |
parse_answer(answer) | |
def loop(): | |
number = 1 | |
while True: | |
c = stdscr.getkey() | |
if c in channels: | |
number = int(c) | |
elif c == curses.KEY_DOWN: | |
change_state(enable_relay(number, state)) | |
elif c == curses.KEY_UP: | |
change_state(disable_relay(number, state)) | |
elif c == 'q': | |
break | |
def finish(): | |
curses.nocbreak() | |
stdscr.keypad(False) | |
curses.echo() | |
curses.endwin() | |
ser.close() | |
init() | |
wrapper(loop()) | |
finish() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment