-
-
Save mtovts/9d06c7e7d2dd4e1806528039d5d95dd3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
from pymodbus.client.sync import ModbusSerialClient | |
from mycodo.inputs.base_input import AbstractInput | |
measurements_dict = { | |
0: { | |
'measurement': 'temperature', | |
'unit': 'C', | |
}, | |
# 1: { | |
# 'measurement': 'humidity', | |
# 'unit': 'percent' | |
# }, | |
# 2: { | |
# 'measurement': 'dewpoint', | |
# 'unit': 'C' | |
# } | |
} | |
INPUT_INFORMATION = { | |
'input_name_unique': 'modbusSHT21', | |
'input_manufacturer': 'later', | |
'input_name': 'ModbusRTUSHT21', | |
'input_library': 'pymodbus', | |
'measurements_name': 'Temperature', | |
'measurements_dict': measurements_dict, | |
'url_manufacturer': 'later', | |
'url_datasheet': 'later', | |
'url_product_purchase': 'later', | |
'dependencies_module': [ | |
('pip-pypi', 'pymodbus', 'pymodbus==2.5.3'), | |
], | |
'interfaces': ['UART'], | |
'uart_location': '/dev/ttyAMA0', | |
'uart_baud_rate': 9600, | |
'period': 15, # Float | |
'options_enabled': [ | |
'uart_location', | |
'period', | |
'uart_baud_rate', | |
'measurements_select', | |
'pre_output', | |
], | |
'options_disabled': ['interface'] | |
} | |
class ModbusInputModule(AbstractInput): | |
""" """ | |
def __init__(self, input_dev: dict, testing=True): | |
super(ModbusInputModule, self).__init__(input_dev, testing=testing, name=__name__) | |
self.sensor: ModbusSerialClient = None | |
if not testing: | |
self.initialize_input() | |
def initialize_input(self): | |
try: | |
self.sensor = ModbusSerialClient( | |
method='rtu', | |
port=self.input_dev.uart_location, | |
parity='N', | |
baudrate=self.input_dev.uart_baud_rate, | |
bytesize=8, | |
stopbits=1, | |
timeout=1, | |
) | |
self.sensor.connect() | |
self.logger.info("Sensor initialized without causing an exception!") | |
except: | |
self.logger.exception("Exception encountered while initializing input!") | |
def get_measurement(self): | |
""" """ | |
self.return_dict = copy.deepcopy(measurements_dict) | |
if not self.sensor: | |
self.logger.error("Sensor not set up") | |
return | |
try: | |
response = self.sensor.read_holding_registers( | |
address=1, count=1, unit=0x01 | |
) | |
if response.isError(): | |
raise EnvironmentError('') | |
temperature_c = response.registers | |
self.logger.debug( | |
"Value returned from the sensor library: {}. Saving to database.".format( | |
temperature_c)) | |
self.value_set(0, temperature_c) | |
return self.return_dict | |
except Exception as msg: | |
self.logger.exception("Input read failure: {}".format(msg)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment