Skip to content

Instantly share code, notes, and snippets.

@mtovts
Created October 24, 2021 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mtovts/9d06c7e7d2dd4e1806528039d5d95dd3 to your computer and use it in GitHub Desktop.
Save mtovts/9d06c7e7d2dd4e1806528039d5d95dd3 to your computer and use it in GitHub Desktop.
import copy
from pymodbus.client.sync import ModbusSerialClient
from mycodo.inputs.base_input import AbstractInput
measurements_dict = {
0: {
'measurement': 'temperature',
'unit': 'C',
},
# 1: {
# 'measurement': 'humidity',
# 'unit': 'percent'
# },
# 2: {
# 'measurement': 'dewpoint',
# 'unit': 'C'
# }
}
INPUT_INFORMATION = {
'input_name_unique': 'modbusSHT21',
'input_manufacturer': 'later',
'input_name': 'ModbusRTUSHT21',
'input_library': 'pymodbus',
'measurements_name': 'Temperature',
'measurements_dict': measurements_dict,
'url_manufacturer': 'later',
'url_datasheet': 'later',
'url_product_purchase': 'later',
'dependencies_module': [
('pip-pypi', 'pymodbus', 'pymodbus==2.5.3'),
],
'interfaces': ['UART'],
'uart_location': '/dev/ttyAMA0',
'uart_baud_rate': 9600,
'period': 15, # Float
'options_enabled': [
'uart_location',
'period',
'uart_baud_rate',
'measurements_select',
'pre_output',
],
'options_disabled': ['interface']
}
class ModbusInputModule(AbstractInput):
""" """
def __init__(self, input_dev: dict, testing=True):
super(ModbusInputModule, self).__init__(input_dev, testing=testing, name=__name__)
self.sensor: ModbusSerialClient = None
if not testing:
self.initialize_input()
def initialize_input(self):
try:
self.sensor = ModbusSerialClient(
method='rtu',
port=self.input_dev.uart_location,
parity='N',
baudrate=self.input_dev.uart_baud_rate,
bytesize=8,
stopbits=1,
timeout=1,
)
self.sensor.connect()
self.logger.info("Sensor initialized without causing an exception!")
except:
self.logger.exception("Exception encountered while initializing input!")
def get_measurement(self):
""" """
self.return_dict = copy.deepcopy(measurements_dict)
if not self.sensor:
self.logger.error("Sensor not set up")
return
try:
response = self.sensor.read_holding_registers(
address=1, count=1, unit=0x01
)
if response.isError():
raise EnvironmentError('')
temperature_c = response.registers
self.logger.debug(
"Value returned from the sensor library: {}. Saving to database.".format(
temperature_c))
self.value_set(0, temperature_c)
return self.return_dict
except Exception as msg:
self.logger.exception("Input read failure: {}".format(msg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment