-
-
Save jplitza/3fdbc9f24df3e75305f4b047fbc8ddae to your computer and use it in GitHub Desktop.
Basic programm to use hidapi and report temp/co2 from Holtek Semiconductor, Inc. USB-zyTemp (hidapi is python3-hidapi in debian buster)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh /etc/rc.common | |
START=50 | |
USE_PROCD=1 | |
NAME=co2mon | |
PROG=/root/co2mon/hidapi_co2mon.py | |
start_service() { | |
local url=$(uci -q get co2mon.@co2mon[0].url) | |
local temperature=$(uci -q get co2mon.@co2mon[0].temperature) | |
local co2=$(uci -q get co2mon.@co2mon[0].co2) | |
local token=$(uci -q get co2mon.@co2mon[0].token) | |
procd_open_instance | |
procd_set_param command "$PROG" \ | |
${url:+--url "$url"} \ | |
${temperature:+--temperature "$temperature"} \ | |
${co2:+--co2 "$co2"} \ | |
${token:+--token "$token"} | |
procd_set_param respawn | |
procd_set_param stdout 1 | |
procd_set_param stderr 1 | |
procd_close_instance | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2014, Johannes Baiter <johannes.baiter@gmail.com> | |
# All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are met: | |
# | |
# * Redistributions of source code must retain the above copyright notice, | |
# this list of conditions and the following disclaimer. | |
# * Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# * Neither the name of Signal 11 Software nor the names of its | |
# contributors may be used to endorse or promote products derived from | |
# this software without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE | |
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
# POSSIBILITY OF SUCH DAMAGE. | |
import ffi | |
import ffilib | |
import uctypes | |
IOError = OSError | |
class HIDAPI: | |
def __init__(self, lib): | |
self._lib = ffilib.open(f'lib{lib}') | |
# int hid_init(void); | |
self.hid_init = self._lib.func('i', 'hid_init', '') | |
# int hid_exit(void); | |
self.hid_exit = self._lib.func('i', 'hid_exit', '') | |
# struct hid_device_info* hid_enumerate(unsigned short vendor_id, | |
# unsigned short product_id); | |
self.hid_enumerate = self.not_implemented | |
# void hid_free_enumeration(struct hid_device_info *devs); | |
self.hid_free_numeration = self.not_implemented | |
# hid_device* hid_open(unsigned short vendor_id, unsigned short product_id, | |
# const wchar_t *serial_number); | |
self.hid_open = self._lib.func('p', 'hid_open', 'HHP') | |
# hid_device* hid_open_path(const char *path); | |
self.hid_open_path = self._lib.func('p', 'hid_open_path', 's') | |
# int hid_write(hid_device *device, const unsigned char *data, size_t length); | |
self.hid_write = self._lib.func('i', 'hid_write', 'psI') | |
# int hid_read_timeout(hid_device *dev, unsigned char *data, size_t length, | |
# int milliseconds); | |
self.hid_read_timeout = self._lib.func('i', 'hid_read_timeout', 'PsIi') | |
# int hid_read(hid_device *device, unsigned char *data, size_t length); | |
self.hid_read = self._lib.func('i', 'hid_read', 'PpI') | |
# int hid_set_nonblocking(hid_device *device, int nonblock); | |
self.hid_set_nonblocking = self._lib.func('i', 'hid_set_nonblocking', 'Pi') | |
# int hid_send_feature_report(hid_device *device, const unsigned char *data, | |
# size_t length); | |
self.hid_send_feature_report = self._lib.func('i', 'hid_send_feature_report', 'PsI') | |
# int hid_get_feature_report(hid_device *device, unsigned char *data, | |
# size_t length); | |
self.hid_get_feature_report = self._lib.func('i', 'hid_get_feature_report', 'PpI') | |
# void hid_close(hid_device *device); | |
self.hid_close = self._lib.func('v', 'hid_close', 'P') | |
# int hid_get_manufacturer_string(hid_device *device, wchar_t *string, | |
# size_t maxlen); | |
self.hid_get_manufacturer_string = self._lib.func('i', 'hid_get_manufacturer_string', 'PsI') | |
# int hid_get_product_string(hid_device *device, wchar_t *string, | |
# size_t maxlen); | |
self.hid_get_product_string = self._lib.func('i', 'hid_get_product_string', 'PsI') | |
# int hid_get_serial_number_string(hid_device *device, wchar_t *string, | |
# size_t maxlen); | |
self.hid_get_serial_number_string = self._lib.func('i', 'hid_get_serial_number_string', 'PsI') | |
# int hid_get_indexed_string(hid_device *device, int string_index, | |
# wchar_t *string, size_t maxlen); | |
self.hid_get_indexed_string = self._lib.func('i', 'hid_get_indexed_string', 'PisI') | |
# const wchar_t* hid_error(hid_device *device); | |
self.hid_error = self._lib.func('p', 'hid_error', 'P') | |
def not_implemented(self, *args): | |
raise NotImplementedError() | |
def _wchar_to_str(wchar): | |
# this simply assumes ASCII | |
buf = "" | |
msg = "" | |
size = 0 | |
while buf[-4:] != '\0\0\0\0': | |
size += 4 | |
buf = uctypes.bytearray_at(wchar, size) | |
msg += chr(buf[-4]) | |
return msg | |
for lib in ('hidapi-libusb', 'hidapi-hidraw', 'hidapi'): | |
try: | |
hidapi = HIDAPI(lib) | |
break | |
except OSError: | |
pass | |
else: | |
raise OSError("Could not find any hidapi library") | |
if hidapi.hid_init() == -1: | |
raise OSError("Failed to initialize hidapi") | |
class DeviceInfo(object): | |
__slots__ = ['path', 'vendor_id', 'product_id', 'serial_number', | |
'release_number', 'manufacturer_string', 'product_string', | |
'usage_page', 'usage', 'interface_number'] | |
def __init__(self, info_struct): | |
#: Platform-specific device path | |
self.path = ffi.string(info_struct.path) | |
#: Device Vendor ID | |
self.vendor_id = info_struct.vendor_id | |
#: Device Product ID | |
self.product_id = info_struct.product_id | |
#: Serial Number | |
self.serial_number = (ffi.string(info_struct.serial_number) | |
if info_struct.serial_number else None) | |
#: Device Release Number in binary-coded decimal, also known as | |
# Device Version Number | |
self.release_number = info_struct.release_number | |
#: Manufacturer String | |
self.manufacturer_string = (ffi.string(info_struct.manufacturer_string) | |
if info_struct.manufacturer_string | |
else None) | |
#: Product string | |
self.product_string = (ffi.string(info_struct.product_string) | |
if info_struct.product_string else None) | |
#: Usage Page for this Device/Interface (Windows/Mac only). | |
self.usage_page = info_struct.usage_page or None | |
#: Usage for this Device/Interface (Windows/Mac only). | |
self.usage = info_struct.usage or None | |
#: The USB interface which this logical device represents. Valid on | |
# both Linux implementations in all cases, and valid on the Windows | |
# implementation only if the device contains more than one interface. | |
self.interface_number = info_struct.interface_number | |
def enumerate(vendor_id=0, product_id=0): | |
""" Enumerate the HID Devices. | |
Returns a generator that yields all of the HID devices attached to the | |
system. | |
:param vendor_id: Only return devices which match this vendor id | |
:type vendor_id: int | |
:param product_id: Only return devices which match this product id | |
:type product_id: int | |
:return: Generator that yields informations about attached | |
HID devices | |
:rval: generator(DeviceInfo) | |
""" | |
info = hidapi.hid_enumerate(vendor_id, product_id) | |
while info: | |
yield DeviceInfo(info) | |
info = info.next | |
hidapi.hid_free_enumeration(info) | |
class Device(object): | |
def __init__(self, info=None, path=None, vendor_id=None, product_id=None, | |
serial_number=None, blocking=True): | |
""" Open a connection to a HID device. | |
This can be done either from a DeviceInfo object, a device path or | |
a combination of vendor id, product id and an optional serial number. | |
If no serial number is passed, the first matching device will be | |
selected. | |
By setting :param blocking: to True, all reads will be blocking by | |
default, otherwise they will return `None` if no data is available. | |
:param info: Information about the device to initialize | |
:type info: DeviceInfo | |
:param path: Platform-specific path to the device (e.g. | |
`/dev/hidraw0` on Linux) | |
:type path: str | |
:param vendor_id: Vendor ID | |
:type vendor_id: int | |
:param product_id: Product ID | |
:type product_id: int | |
:param serial_number: Device serial number | |
:type serial_number: str | |
:param blocking: Enable blocking reads by default | |
:type blocking: boolean | |
""" | |
if info is not None: | |
self._device = hidapi.hid_open_path(info.path) | |
elif path is not None: | |
self._device = hidapi.hid_open_path(path) | |
elif not (vendor_id is None or product_id is None): | |
self._device = hidapi.hid_open(vendor_id, product_id, | |
serial_number or None) | |
else: | |
raise ValueError("Must provide either a DeviceInfo object, 'path' " | |
"or 'vendor_id' and 'product_id'.") | |
if self._device == 0: | |
raise IOError("Could not open connection to device.") | |
if not blocking: | |
hidapi.hid_set_nonblocking(self._device, 1) | |
def __del__(self): | |
if self._device is not None: | |
self.close() | |
def write(self, data, report_id=b'\0'): | |
""" Write an Output report to a HID device. | |
This will send the data on the first OUT endpoint, if one exists. If it | |
does not, it will be sent the data through the Control Endpoint | |
(Endpoint 0). | |
:param data: The data to be sent | |
:type data: str/bytes | |
:param report_id: The Report ID to write to (default: 0x0) | |
""" | |
self._check_device_status() | |
bufp = ffi.new("unsigned char[]", len(data)+1) | |
buf = ffi.buffer(bufp, len(data)+1) | |
buf[0] = report_id | |
buf[1:] = data | |
rv = hidapi.hid_write(self._device, bufp, len(data)+1) | |
if rv == -1: | |
raise IOError("Failed to write to HID device.") | |
def read(self, length, timeout_ms=0, blocking=False): | |
""" Read an Input report from a HID device with timeout. | |
Input reports are returned to the host through the `INTERRUPT IN` | |
endpoint. The first byte will contain the Report number if the device | |
uses numbered reports. | |
By default reads are non-blocking, i.e. the method will return | |
`None` if no data was available. Blocking reads can be enabled with | |
:param blocking:. Additionally, a timeout for the read can be | |
specified. | |
:param length: The number of bytes to read. For devices with | |
multiple reports, make sure to read an extra byte | |
for the report number. | |
:param timeout_ms: Timeout in miliseconds | |
:type timeout_ms: int | |
:param blocking: Block until data is available | |
""" | |
self._check_device_status() | |
bufp = bytearray(length) | |
if not timeout_ms and blocking: | |
timeout_ms = -1 | |
if timeout_ms: | |
rv = hidapi.hid_read_timeout(self._device, bufp, length, | |
timeout_ms) | |
else: | |
rv = hidapi.hid_read(self._device, bufp, length) | |
if rv == -1: | |
raise IOError("Failed to read from HID device: {0}" | |
.format(self._get_last_error_string())) | |
elif rv == 0: | |
return None | |
else: | |
return bufp | |
def get_manufacturer_string(self): | |
""" Get the Manufacturer String from the HID device. | |
:return: The Manufacturer String | |
:rtype: unicode | |
""" | |
self._check_device_status() | |
str_p = ffi.new("wchar_t[]", 255) | |
rv = hidapi.hid_get_manufacturer_string(self._device, str_p, 255) | |
if rv == -1: | |
raise IOError("Failed to read manufacturer string from HID " | |
"device: {0}".format(self._get_last_error_string())) | |
return ffi.string(str_p) | |
def get_product_string(self): | |
""" Get the Product String from the HID device. | |
:return: The Product String | |
:rtype: unicode | |
""" | |
self._check_device_status() | |
str_p = ffi.new("wchar_t[]", 255) | |
rv = hidapi.hid_get_product_string(self._device, str_p, 255) | |
if rv == -1: | |
raise IOError("Failed to read product string from HID device: {0}" | |
.format(self._get_last_error_string())) | |
return ffi.string(str_p) | |
def get_serial_number_string(self): | |
""" Get the Serial Number String from the HID device. | |
:return: The Serial Number String | |
:rtype: unicode | |
""" | |
self._check_device_status() | |
str_p = ffi.new("wchar_t[]", 255) | |
rv = hidapi.hid_get_serial_number_string(self._device, str_p, 255) | |
if rv == -1: | |
raise IOError("Failed to read serial number string from HID " | |
"device: {0}".format(self._get_last_error_string())) | |
return ffi.string(str_p) | |
def send_feature_report(self, data, report_id=0x0): | |
""" Send a Feature report to the device. | |
Feature reports are sent over the Control endpoint as a Set_Report | |
transfer. | |
:param data: The data to send | |
:type data: str/bytes | |
:param report_id: The Report ID to send to | |
:type report_id: int | |
""" | |
self._check_device_status() | |
buf = bytearray([report_id]) + data | |
rv = hidapi.hid_send_feature_report(self._device, buf, len(buf)) | |
if rv == -1: | |
raise IOError("Failed to send feature report to HID device: {0}" | |
.format(self._get_last_error_string())) | |
def get_feature_report(self, report_id, length): | |
""" Get a feature report from the device. | |
:param report_id: The Report ID of the report to be read | |
:type report_id: int | |
:return: The report data | |
:rtype: str/bytes | |
""" | |
self._check_device_status() | |
bufp = ffi.new("unsigned char[]", length+1) | |
buf = ffi.buffer(bufp, length+1) | |
buf[0] = report_id | |
rv = hidapi.hid_get_feature_report(self._device, bufp, length+1) | |
if rv == -1: | |
raise IOError("Failed to get feature report from HID device: {0}" | |
.format(self._get_last_error_string())) | |
return buf[1:] | |
def get_indexed_string(self, idx): | |
""" Get a string from the device, based on its string index. | |
:param idx: The index of the string to get | |
:type idx: int | |
:return: The string at the index | |
:rtype: unicode | |
""" | |
self._check_device_status() | |
bufp = ffi.new("wchar_t*") | |
rv = hidapi.hid_get_indexed_string(self._device, idx, bufp, 65536) | |
if rv == -1: | |
raise IOError("Failed to read string with index {0} from HID " | |
"device: {0}" | |
.format(idx, self._get_last_error_string())) | |
return ffi.buffer(bufp, 65536)[:].strip() | |
def close(self): | |
""" Close connection to HID device. | |
Automatically run when a Device object is garbage-collected, though | |
manual invocation is recommended. | |
""" | |
self._check_device_status() | |
hidapi.hid_close(self._device) | |
self._device = None | |
def _get_last_error_string(self): | |
return _wchar_to_str(hidapi.hid_error(self._device)) | |
def _check_device_status(self): | |
if self._device is None: | |
raise OSError("Trying to perform action on closed device.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/micropython | |
import argparse | |
import logging | |
try: | |
import logging.handlers | |
HAVE_LOGGING_HANDLERS = True | |
except ImportError: | |
HAVE_LOGGING_HANDLERS = False | |
try: | |
import requests | |
except ImportError: | |
import urequests as requests | |
import json | |
from hidapi import Device | |
PROGNAME = 'co2mon' | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--url', | |
default='https://homeassistant.local/', | |
help='Home Assistant base URL', | |
) | |
parser.add_argument( | |
'--temperature', | |
default='sensor.temperature', | |
help='Home Assistant temperature sensor name', | |
) | |
parser.add_argument( | |
'--co2', | |
default='sensor.temperature', | |
help='Home Assistant CO2 sensor name', | |
) | |
parser.add_argument( | |
'--token', | |
help='Home Assistant API token', | |
) | |
parser.add_argument( | |
'--debug', | |
action='store_true', | |
help='Set loglevel to debug', | |
) | |
args = parser.parse_args() | |
vid = 0x04d9 | |
pid = 0xa052 | |
attributes = { | |
'temperature': { | |
'state_class': 'measurement', | |
'unit_of_measurement': '\xb0C', | |
'device_class': 'temperature', | |
}, | |
'co2': { | |
'state_class': 'measurement', | |
'unit_of_measurement': 'ppm', | |
'device_class': 'carbon_dioxide', | |
}, | |
} | |
logger = logging.getLogger(PROGNAME) | |
logger.setLevel(logging.DEBUG if args.debug else logging.INFO) | |
if HAVE_LOGGING_HANDLERS: | |
syslog_handler = logging.handlers.SysLogHandler(address='/dev/log') | |
syslog_handler.setFormatter(logging.Formatter(f'{PROGNAME}[%(process)d]: %(levelname)s: %(message)s')) | |
logger.addHandler(syslog_handler) | |
logger.debug('Opening the device') | |
h = Device(vendor_id=vid, product_id=pid) | |
def send(metric_name, metric_value): | |
logger.info(f'Sending {metric_name}={metric_value}') | |
# we need to fiddle with the degree symbol | |
data = json.dumps({ | |
'state': str(metric_value), | |
'attributes': attributes[metric_name], | |
}).replace('\xb0', '\\u00b0') | |
req = requests.post( | |
f'{args.url}/api/states/{getattr(args, metric_name)}', | |
data=data, | |
headers={ | |
'Authorization': f'Bearer {args.token}', | |
'Content-Type': 'application/json', | |
}, | |
) | |
logger.debug(f'Response: {req.status_code} {req.reason}') | |
logger.debug(req.text) | |
if req.status_code >= 400 and req.status_code < 500: | |
raise RuntimeError(f'Home Assistant responded with status code {req.status_code} {req.reason}') | |
logger.debug('Sending feature report') | |
h.send_feature_report(bytearray([0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96]), report_id=0) | |
logger.debug('Start reading') | |
try: | |
while True: | |
r = h.read(8) | |
if r: | |
logger.debug(f'read: "{r}"') | |
if r[4] != 0x0d: | |
logger.warning(f'Unexpected data from device: {r}') | |
continue | |
metric = r[0] | |
value = r[1] | |
value_thousands = r[2] | |
checksum = r[3] | |
if checksum != (metric + value + value_thousands) & 0xff: | |
logger.warning(f'Checksum error: {metric} + {value} + {value_thousands} != {checksum} (mod 256)') | |
continue | |
metric_name = '' | |
value_shifted = value << 8 | |
metric_value = value_shifted + value_thousands | |
if metric == 0x42: | |
metric_name = 'temperature' | |
metric_value = round(metric_value / 16 - 273.15, 1) | |
elif metric == 0x50: | |
metric_name = 'co2' | |
elif metric == 0x44: | |
metric_name = 'humidity' | |
else: | |
continue | |
send(metric_name, metric_value) | |
finally: | |
logger.debug('Closing the device') | |
h.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment