Created
May 29, 2024 08:15
-
-
Save halida/8d443347a95bf51c8fe7f7641c48cf5d to your computer and use it in GitHub Desktop.
pikvm set hk4401
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kvmd: | |
msd: | |
type: disabled | |
gpio: | |
drivers: | |
hk: | |
type: xh_hk4401 | |
protocol: 2 | |
device: /dev/ttyUSB0 | |
wol_server1: | |
type: wol | |
mac: 14:14:14:14:14:14 | |
wol_server2: | |
type: wol | |
mac: 14:14:14:14:14:14 | |
ip: 192.168.0.126 | |
shutdown: | |
type: cmd | |
cmd: [/usr/bin/sudo, /usr/sbin/shutdown, -h, now] | |
scheme: | |
ch0_led: | |
driver: hk | |
pin: 0 | |
mode: input | |
ch1_led: | |
driver: hk | |
pin: 1 | |
mode: input | |
ch2_led: | |
driver: hk | |
pin: 2 | |
mode: input | |
ch3_led: | |
driver: hk | |
pin: 3 | |
mode: input | |
ch0_button: | |
driver: hk | |
pin: 0 | |
mode: output | |
switch: false | |
ch1_button: | |
driver: hk | |
pin: 1 | |
mode: output | |
switch: false | |
ch2_button: | |
driver: hk | |
pin: 2 | |
mode: output | |
switch: false | |
ch3_button: | |
driver: hk | |
pin: 3 | |
mode: output | |
switch: false | |
wol_server1: | |
driver: wol_server1 | |
pin: 0 | |
mode: output | |
switch: false | |
wol_server2: | |
driver: wol_server2 | |
pin: 0 | |
mode: output | |
switch: false | |
shutdown_button: | |
driver: shutdown | |
pin: 0 | |
mode: output | |
switch: false | |
view: | |
header: | |
title: Wol | |
table: | |
- ["shutdown_button|confirm|Shutdown PiKVM"] | |
- ["#XPS", "wol_server1|Send"] | |
- ["#AMD", "wol_server2|Send"] | |
- ["#Input 1", ch0_led, ch0_button] | |
- ["#Input 2", ch1_led, ch1_button] | |
- ["#Input 3", ch2_led, ch2_button] | |
- ["#Input 4", ch3_led, ch3_button] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ========================================================================== # | |
# # | |
# KVMD - The main PiKVM daemon. # | |
# # | |
# Copyright (C) 2018-2023 Maxim Devaev <mdevaev@gmail.com> # | |
# 2021-2021 Sebastian Goscik <sebastian.goscik@live.co.uk> # | |
# # | |
# This program is free software: you can redistribute it and/or modify # | |
# it under the terms of the GNU General Public License as published by # | |
# the Free Software Foundation, either version 3 of the License, or # | |
# (at your option) any later version. # | |
# # | |
# This program is distributed in the hope that it will be useful, # | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
# GNU General Public License for more details. # | |
# # | |
# You should have received a copy of the GNU General Public License # | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. # | |
# # | |
# ========================================================================== # | |
import re | |
import multiprocessing | |
import functools | |
import errno | |
import time | |
from typing import Callable | |
from typing import Any | |
import serial | |
from ...logging import get_logger | |
from ... import aiotools | |
from ... import aiomulti | |
from ... import aioproc | |
from ...yamlconf import Option | |
from ...validators.basic import valid_number | |
from ...validators.basic import valid_float_f01 | |
from ...validators.os import valid_abs_path | |
from ...validators.hw import valid_tty_speed | |
from . import GpioDriverOfflineError | |
from . import BaseUserGpioDriver | |
# ===== | |
class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes | |
def __init__( | |
self, | |
instance_name: str, | |
notifier: aiotools.AioNotifier, | |
device_path: str, | |
speed: int, | |
read_timeout: float, | |
protocol: int, | |
) -> None: | |
super().__init__(instance_name, notifier) | |
self.__device_path = device_path | |
self.__speed = speed | |
self.__read_timeout = read_timeout | |
self.__protocol = protocol # https://github.com/pikvm/kvmd/pull/158 | |
self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() | |
self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue() | |
self.__channel: (int | None) = -1 | |
self.__proc: (multiprocessing.Process | None) = None | |
self.__stop_event = multiprocessing.Event() | |
@classmethod | |
def get_plugin_options(cls) -> dict: | |
return { | |
"device": Option("", type=valid_abs_path, unpack_as="device_path"), | |
"speed": Option(19200, type=valid_tty_speed), | |
"read_timeout": Option(2.0, type=valid_float_f01), | |
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)), | |
} | |
@classmethod | |
def get_pin_validator(cls) -> Callable[[Any], Any]: | |
return functools.partial(valid_number, min=0, max=3, name="XH-HK4401 channel") | |
def prepare(self) -> None: | |
assert self.__proc is None | |
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) | |
self.__proc.start() | |
async def run(self) -> None: | |
while True: | |
(got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1) | |
if got and self.__channel != channel: | |
self.__channel = channel | |
self._notifier.notify() | |
async def cleanup(self) -> None: | |
if self.__proc is not None: | |
if self.__proc.is_alive(): | |
get_logger(0).info("Stopping %s daemon ...", self) | |
self.__stop_event.set() | |
if self.__proc.is_alive() or self.__proc.exitcode is not None: | |
self.__proc.join() | |
async def read(self, pin: str) -> bool: | |
if not self.__is_online(): | |
raise GpioDriverOfflineError(self) | |
return (self.__channel == int(pin)) | |
async def write(self, pin: str, state: bool) -> None: | |
if not self.__is_online(): | |
raise GpioDriverOfflineError(self) | |
if state: | |
self.__ctl_queue.put_nowait(int(pin)) | |
# ===== | |
def __is_online(self) -> bool: | |
return ( | |
self.__proc is not None | |
and self.__proc.is_alive() | |
and self.__channel is not None | |
) | |
def __serial_worker(self) -> None: | |
logger = aioproc.settle(str(self), f"gpio-xh-hk4401-{self._instance_name}") | |
while not self.__stop_event.is_set(): | |
try: | |
with self.__get_serial() as tty: | |
data = b"" | |
self.__channel_queue.put_nowait(-1) | |
# Wait for first port heartbeat to set correct channel (~2 sec max). | |
# Only for the classic switch with protocol version 1. | |
while not self.__stop_event.is_set(): | |
(channel, data) = self.__recv_channel(tty, data) | |
if channel is not None: | |
self.__channel_queue.put_nowait(channel) | |
(got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore | |
if got: | |
assert channel is not None | |
self.__send_channel(tty, channel) | |
if self.__protocol == 2: | |
self.__channel_queue.put_nowait(channel) | |
except Exception as err: | |
self.__channel_queue.put_nowait(None) | |
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member | |
logger.error("Missing %s serial device: %s", self, self.__device_path) | |
else: | |
logger.exception("Unexpected %s error", self) | |
time.sleep(1) | |
def __get_serial(self) -> serial.Serial: | |
return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) | |
def __recv_channel(self, tty: serial.Serial, data: bytes) -> tuple[(int | None), bytes]: | |
channel: (int | None) = None | |
if tty.in_waiting: | |
data += tty.read_all() | |
found = re.findall((b"AG0[1-4]gA" if self.__protocol == 1 else b"G0[1-4]gA\x00"), data) | |
if found: | |
try: | |
channel = int(found[-1][2:4] if self.__protocol == 1 else found[-1][1:3]) - 1 | |
except Exception: | |
channel = None | |
data = data[-12:] | |
return (channel, data) | |
def __send_channel(self, tty: serial.Serial, channel: int) -> None: | |
assert 0 <= channel <= 3 | |
cmd = ("SW{port}\r\nAG{port:02d}gA" if self.__protocol == 1 else "G{port:02d}gA\x00") | |
tty.write(cmd.format(port=(channel + 1)).encode()) | |
tty.flush() | |
def __str__(self) -> str: | |
return f"XH-HK4401({self._instance_name})" | |
__repr__ = __str__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment