Last active
April 21, 2022 20:05
-
-
Save symbioquine/00b3884b2d41ce69edb3547112706289 to your computer and use it in GitHub Desktop.
Proxying MT50 and Epever Tracer with pymodbus & epsolar_tracer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import logging, _thread, threading, traceback, queue, subprocess | |
import sys, time, logging, glob | |
from epsolar_tracer.client import EPsolarTracerClient | |
from epsolar_tracer.enums.RegisterTypeEnum import RegisterTypeEnum | |
from epsolar_tracer.registers import coils, registers | |
from pymodbus.client.sync import ModbusSerialClient as ModbusClient | |
from pymodbus.pdu import ExceptionResponse, ModbusExceptions as merror | |
from pymodbus.bit_read_message import ReadDiscreteInputsRequest, ReadDiscreteInputsResponse | |
from pymodbus.bit_write_message import WriteMultipleCoilsRequest | |
from pymodbus.register_write_message import WriteMultipleRegistersRequest | |
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext | |
from pymodbus.datastore import ModbusSparseDataBlock | |
from pymodbus.datastore.store import BaseModbusDataBlock | |
from pymodbus.device import ModbusDeviceIdentification | |
from pymodbus.interfaces import IModbusSlaveContext | |
from pymodbus.server.sync import StartSerialServer | |
from pymodbus.transaction import ModbusRtuFramer | |
from pymodbus.exceptions import ModbusIOException | |
from custom_modbus_commands import * | |
import logging | |
from pymodbus.client.common import ModbusClientMixin | |
from pymodbus.mei_message import ReadDeviceInformationResponse | |
FORMAT = ('%(asctime)-15s %(threadName)-15s' | |
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s') | |
logging.basicConfig(format=FORMAT) | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
_logger = logging.getLogger(__name__) | |
def execute(cmd, *args, **kwargs): | |
return subprocess.check_output(cmd.format(*args, **kwargs), stderr=subprocess.STDOUT, shell=True) | |
def infer_tracer_and_mt50_tty_ports(): | |
usb_tty_ports = glob.glob("/dev/ttyUSB*") | |
if len(usb_tty_ports) < 1 or len(usb_tty_ports) > 2: | |
print('Error: Found more than 2 usb tty ports: '.format(usb_tty_ports)) | |
sys.exit(1) | |
for idx, usb_tty_port in enumerate(usb_tty_ports): | |
modbus_client = None | |
try: | |
# Go back to allowing EPsolarTracerClient to initialize this once https://github.com/Salamek/epsolar-tracer/pull/3/files is merged | |
modbus_client = ModbusClient(method='rtu', port=usb_tty_port, timeout=0.5, bytesize=8, stopbits=1, baudrate=115200, parity='N') | |
client = EPsolarTracerClient(serialclient=modbus_client) | |
if not client.connect(): | |
continue | |
device_info = client.read_device_info() | |
if not isinstance(device_info, ReadDeviceInformationResponse): | |
continue | |
print("Found {} on port {}".format(repr(list(device_info.information.values())), usb_tty_port)) | |
if len(usb_tty_ports) >= 2: | |
if idx == 0: | |
return usb_tty_ports[0], usb_tty_ports[1] | |
else: | |
return usb_tty_ports[1], usb_tty_ports[0] | |
else: | |
return usb_tty_port, None | |
finally: | |
modbus_client.close() | |
print("Error: Failed to infer client/server tty usb ports from {}".format(usb_tty_ports)) | |
sys.exit(1) | |
client_port, server_port = infer_tracer_and_mt50_tty_ports() | |
class ClientDelegatingSlaveContext(IModbusSlaveContext): | |
def __init__(self, delegate_client): | |
self.delegate_client = delegate_client | |
def __str__(self): | |
return "ClientDelegatingSlaveContext" | |
def reset(self): | |
pass | |
def validate(self, fx, address, count=1): | |
return True | |
def getValues(self, fx, address, count=1): | |
start = time.time() | |
try: | |
if fx == 0x2: | |
response = self.delegate_client.execute(ReadDiscreteInputsRequest(address=address, count=count, unit=1)) | |
if not type(response) == ReadDiscreteInputsResponse: | |
raise Exception("Got unexpected response type for ReadDiscreteInputsRequest: {}".format(response)) | |
values = list(map(response.getBit, range(count))) | |
return values | |
elif fx == 0x43: | |
response = self.delegate_client.execute(ReadSparseRegistersRequest(address=address, count=count, unit=1)) | |
if not type(response) == ReadSparseRegistersResponse: | |
raise Exception("Got unexpected response type for ReadSparseRegistersRequest: {}".format(response)) | |
return response.registers | |
else: | |
_logger.warning("ClientDelegatingSlaveContext.getValues[0x%x] address: 0x%x, count: %d" % (fx, address, count)) | |
raise Exception("Got unexpected function request: 0x%x" % (fx)) | |
finally: | |
end = time.time() | |
def setValues(self, fx, address, values): | |
if fx == 0xf: | |
response = self.delegate_client.execute(WriteMultipleCoilsRequest(address=address, values=values, unit=1)) | |
elif fx == 0x10: | |
response = self.delegate_client.execute(WriteMultipleRegistersRequest(address=address, values=values, unit=1)) | |
else: | |
_logger.warning("ClientDelegatingSlaveContext.setValues[0x%x] address: 0x%x, values: %s" % (fx, address, repr(values))) | |
def register(self, fc, fx, datablock=None): | |
raise NotImplementedException("This context only knows how to delegate. No data blocks can be registered.") | |
class ClientThreadCommand(object): | |
def __init__(self, request): | |
self.request = request | |
self.response = None | |
self.condition = threading.Semaphore(value=0) | |
def fulfill(self, response): | |
self.response = response | |
self.condition.release() | |
class ClientThread(threading.Thread, ModbusClientMixin): | |
def __init__(self, delegate_client): | |
threading.Thread.__init__(self, daemon=True) | |
self.delegate_client = delegate_client | |
self.command_queue = queue.Queue(maxsize=2) | |
def run(self): | |
while True: | |
cmd_start = None | |
start = time.time() | |
try: | |
cmd = self.command_queue.get(block=True, timeout=None) | |
cmd_start = time.time() | |
response = self.delegate_client.execute(cmd.request) | |
if (time.time() - cmd_start) > 0.25: | |
print("Response for command which took >0.25s:", type(response), response) | |
cmd.fulfill(response) | |
except KeyboardInterrupt: | |
sys.exit(0) | |
except Exception as ex: | |
print("Failed to execute command. Unknown error...", traceback.format_exc()) | |
cmd.fulfill(Exception("Failed to execute command. Unknown error...", ex)) | |
finally: | |
end = time.time() | |
# if cmd_start: | |
# print("ClientThread.run; loop iteration time:", end - start, "cmd execution time:", end - cmd_start) | |
# else: | |
# print("ClientThread.run; loop iteration time:", end - start) | |
def execute(self, request): | |
try: | |
cmd = ClientThreadCommand(request) | |
self.command_queue.put(cmd, block=True, timeout=0.25) | |
if cmd.condition.acquire(blocking=True, timeout=0.25): | |
if isinstance(cmd.response, BaseException): | |
raise cmd.response | |
return cmd.response | |
print("Could not execute command. Condition timeout...", request) | |
raise Exception("Could not execute command. Condition timeout...") | |
except queue.Full: | |
raise Exception("Could not execute command. Queue full...") | |
except Exception as ex: | |
raise Exception("Could not execute command. Unknown error...", ex) | |
def connect(self): | |
pass | |
def close(self): | |
pass | |
def is_socket_open(self): | |
return self.delegate_client.is_socket_open() | |
def main(): | |
# Go back to allowing EPsolarTracerClient to initialize this once https://github.com/Salamek/epsolar-tracer/pull/3/files is merged | |
modbus_client = ModbusClient(method='rtu', port=client_port, timeout=0.5, bytesize=8, stopbits=1, baudrate=115200, parity='N') | |
client = EPsolarTracerClient(serialclient=modbus_client) | |
client.client.register(ReadSparseRegistersResponse) | |
if not client.connect(): | |
print('Connection failed to {}'.format(client_port)) | |
sys.exit(1) | |
device_info = client.read_device_info() | |
if not isinstance(device_info, ReadDeviceInformationResponse): | |
print('Could not get device info: {}'.format(device_info)) | |
sys.exit(1) | |
client_thread = ClientThread(client.client) | |
client_thread.start() | |
slave_context = ClientDelegatingSlaveContext(client_thread) | |
context = ModbusServerContext(slaves=slave_context, single=True) | |
identity = ModbusDeviceIdentification() | |
identity.VendorName = device_info.information[0] # 'EPsolar Tech co., Ltd' | |
identity.ProductCode = device_info.information[1] # 'TriRon3210' | |
identity.MajorMinorRevision = device_info.information[2] # 'V01.55+V01.22' | |
def metric_capture_fn(): | |
while True: | |
try: | |
time.sleep(5) | |
client = EPsolarTracerClient(serialclient=client_thread) | |
# Write code here that accesses the Tracer unit, if desired | |
except KeyboardInterrupt: | |
sys.exit(0) | |
except: | |
traceback.print_exc() | |
print("About to start metrics capture loop...") | |
metric_capture_thread = threading.Thread(target=metric_capture_fn, daemon=True) | |
metric_capture_thread.start() | |
print("About to start main server loop...") | |
if server_port: | |
try: | |
StartSerialServer(context, framer=ModbusRtuFramer, identity=identity, | |
port=server_port, timeout=0.1, bytesize=8, stopbits=1, baudrate=115200, parity='N', | |
custom_functions=[ | |
LoginRequest, | |
ReadSparseRegistersRequest, | |
]) | |
except KeyboardInterrupt: | |
sys.exit(0) | |
metric_capture_thread.join() | |
client_thread.join() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct, math | |
from bitstring import BitArray | |
from pymodbus.pdu import ModbusRequest, ModbusResponse, ModbusExceptions as merror | |
from pymodbus.compat import int2byte, byte2int | |
from pymodbus.utilities import hexlify_packets | |
from pymodbus.transaction import ModbusRtuFramer | |
class LoginRequest(ModbusRequest): | |
function_code = 0x41 | |
@classmethod | |
def calculateRtuFrameSize(cls, buffer): | |
''' Calculates the size of the message | |
:param buffer: A buffer containing the data that have been received. | |
:returns: The number of bytes in the response. | |
''' | |
username_length = byte2int(buffer[3]) | |
return 4 + username_length + 6 + 2 | |
def __init__(self, username=None, pin=None, **kwargs): | |
''' Initializes a new instance | |
:param address: The address to start the read from | |
:param count: The number of registers to read | |
''' | |
ModbusRequest.__init__(self, **kwargs) | |
self.username = username | |
self.pin = pin | |
def encode(self): | |
''' Encodes the request packet | |
:return: The encoded packet | |
''' | |
username_bytes = self.username.encode('utf-8') | |
return struct.pack('>B{}p6s'.format(len(username_bytes)+1), 0x0, username_bytes, self.pin.encode('utf-8')) | |
def decode(self, data): | |
''' Decode a register request packet | |
:param data: The request to decode | |
''' | |
_, self.username, self.pin = struct.unpack('>B5p6s', data) | |
def __str__(self): | |
''' Returns a string representation of the instance | |
:returns: A string representation of the instance | |
''' | |
return "LoginRequest ({}, {})".format(self.username, self.pin) | |
def execute(self, context): | |
''' Run a read holding request against a datastore | |
:param context: The datastore to request from | |
:returns: An initialized response, exception message otherwise | |
''' | |
print("execute:", self) | |
if self.username == b'user' and self.pin == b'000000': | |
return LoginSuccessResponse() | |
return self.doException(merror.IllegalValue) | |
class LoginSuccessResponse(ModbusResponse): | |
function_code = 0x41 | |
_rtu_frame_size = 6 | |
def __init__(self, **kwargs): | |
''' Initializes a new instance | |
:param values: The values to write to | |
''' | |
ModbusResponse.__init__(self, **kwargs) | |
def encode(self): | |
''' Encodes the response packet | |
:returns: The encoded packet | |
''' | |
return struct.pack('>H', 0x1) | |
def decode(self, data): | |
''' Decode a register response packet | |
:param data: The request to decode | |
''' | |
print("decode:", data) | |
def __str__(self): | |
''' Returns a string representation of the instance | |
:returns: A string representation of the instance | |
''' | |
return "LoginSuccessResponse ()" | |
class ReadSparseRegistersRequest(ModbusRequest): | |
function_code = 0x43 | |
_rtu_frame_size = 8 | |
def __init__(self, address=None, count=None, **kwargs): | |
''' Initializes a new instance | |
:param address: The address to start the read from | |
:param count: The number of registers to read | |
''' | |
ModbusRequest.__init__(self, **kwargs) | |
self.address = address | |
self.count = count | |
def encode(self): | |
''' Encodes the request packet | |
:return: The encoded packet | |
''' | |
return struct.pack('>HH', self.address, self.count) | |
def decode(self, data): | |
''' Decode a register request packet | |
:param data: The request to decode | |
''' | |
self.address, self.count = struct.unpack('>HH', data) | |
def __str__(self): | |
''' Returns a string representation of the instance | |
:returns: A string representation of the instance | |
''' | |
return "ReadSparseRegistersRequest (%d,%d)" % (self.address, self.count) | |
def execute(self, context): | |
''' Run a read holding request against a datastore | |
:param context: The datastore to request from | |
:returns: An initialized response, exception message otherwise | |
''' | |
if not (1 <= self.count <= 0x7d): | |
return self.doException(merror.IllegalValue) | |
values = context.getValues(fx=self.function_code, address=self.address, count=self.count) | |
return ReadSparseRegistersResponse(values) | |
class ReadSparseRegistersResponse(ModbusResponse): | |
function_code = 0x43 | |
_rtu_byte_count_pos = 2 | |
@classmethod | |
def calculateRtuFrameSize(cls, buffer): | |
''' Calculates the size of the message | |
:param buffer: A buffer containing the data that have been received. | |
:returns: The number of bytes in the response. | |
''' | |
byte_count = byte2int(buffer[2]) | |
bit_mask_bytes = math.ceil(byte_count / 16) | |
bit_mask = BitArray(bytes=buffer[3:3+bit_mask_bytes]) | |
actual_bit_count = bit_mask.count(True) * 2 | |
return 5 + bit_mask_bytes + actual_bit_count | |
def __init__(self, values=None, **kwargs): | |
''' Initializes a new instance | |
:param values: The values to write to | |
''' | |
ModbusResponse.__init__(self, **kwargs) | |
self.registers = values or [] | |
def encode(self): | |
''' Encodes the response packet | |
:returns: The encoded packet | |
''' | |
byte_count = len(self.registers) * 2 | |
register_count = int(byte_count / 2) | |
bit_mask_bytes = math.ceil(register_count / 8) | |
mask_bits = bit_mask_bytes * 8 | |
bit_mask = BitArray(length=mask_bits) | |
data_bytes = b'' | |
for register_idx, register in enumerate(self.registers): | |
if register is None: | |
continue | |
data_bytes += struct.pack('>H', register) | |
bit_mask[mask_bits - register_idx - 1] = True | |
return int2byte(byte_count) + bit_mask.bytes + data_bytes | |
def decode(self, data): | |
''' Decode a register response packet | |
:param data: The request to decode | |
''' | |
byte_count = byte2int(data[0]) | |
register_count = int(byte_count / 2) | |
bit_mask_bytes = math.ceil(register_count / 8) | |
mask_bits = bit_mask_bytes * 8 | |
bit_mask = BitArray(bytes=data[1:1+bit_mask_bytes]) | |
self.registers = [] | |
i = 1 + bit_mask_bytes | |
for register_idx in range(register_count): | |
mask_value = bit_mask[mask_bits - register_idx - 1] | |
if not mask_value: | |
self.registers.append(None) | |
continue | |
self.registers.append(struct.unpack('>H', data[i:i + 2])[0]) | |
i += 2 | |
def getRegister(self, index): | |
''' Get the requested register | |
:param index: The indexed register to retrieve | |
:returns: The request register | |
''' | |
return self.registers[index] | |
def __str__(self): | |
''' Returns a string representation of the instance | |
:returns: A string representation of the instance | |
''' | |
return "ReadSparseRegistersResponse (%d)" % len(self.registers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MIT License | |
Copyright (c) 2021 symbioquine | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "epever_modbus_testing" | |
version = "0.1.0" | |
description = "" | |
authors = ["Your Name <you@example.com>"] | |
[tool.poetry.dependencies] | |
python = "^3.8" | |
pymodbus = "^2.5.2" | |
epsolar-tracer = "^0.0.17" | |
bitstring = "^3.1.7" | |
[tool.poetry.dev-dependencies] | |
[build-system] | |
requires = ["poetry>=0.12"] | |
build-backend = "poetry.masonry.api" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment