Skip to content

Instantly share code, notes, and snippets.

@symbioquine
Last active April 21, 2022 20:05
Show Gist options
  • Save symbioquine/00b3884b2d41ce69edb3547112706289 to your computer and use it in GitHub Desktop.
Save symbioquine/00b3884b2d41ce69edb3547112706289 to your computer and use it in GitHub Desktop.
Proxying MT50 and Epever Tracer with pymodbus & epsolar_tracer
#!/bin/env python3
import logging, _thread, threading, traceback, queue, subprocess
import sys, time, logging, glob
from epsolar_tracer.client import EPsolarTracerClient
from epsolar_tracer.enums.RegisterTypeEnum import RegisterTypeEnum
from epsolar_tracer.registers import coils, registers
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.pdu import ExceptionResponse, ModbusExceptions as merror
from pymodbus.bit_read_message import ReadDiscreteInputsRequest, ReadDiscreteInputsResponse
from pymodbus.bit_write_message import WriteMultipleCoilsRequest
from pymodbus.register_write_message import WriteMultipleRegistersRequest
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.datastore import ModbusSparseDataBlock
from pymodbus.datastore.store import BaseModbusDataBlock
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.interfaces import IModbusSlaveContext
from pymodbus.server.sync import StartSerialServer
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.exceptions import ModbusIOException
from custom_modbus_commands import *
import logging
from pymodbus.client.common import ModbusClientMixin
from pymodbus.mei_message import ReadDeviceInformationResponse
FORMAT = ('%(asctime)-15s %(threadName)-15s'
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
_logger = logging.getLogger(__name__)
def execute(cmd, *args, **kwargs):
return subprocess.check_output(cmd.format(*args, **kwargs), stderr=subprocess.STDOUT, shell=True)
def infer_tracer_and_mt50_tty_ports():
usb_tty_ports = glob.glob("/dev/ttyUSB*")
if len(usb_tty_ports) < 1 or len(usb_tty_ports) > 2:
print('Error: Found more than 2 usb tty ports: '.format(usb_tty_ports))
sys.exit(1)
for idx, usb_tty_port in enumerate(usb_tty_ports):
modbus_client = None
try:
# Go back to allowing EPsolarTracerClient to initialize this once https://github.com/Salamek/epsolar-tracer/pull/3/files is merged
modbus_client = ModbusClient(method='rtu', port=usb_tty_port, timeout=0.5, bytesize=8, stopbits=1, baudrate=115200, parity='N')
client = EPsolarTracerClient(serialclient=modbus_client)
if not client.connect():
continue
device_info = client.read_device_info()
if not isinstance(device_info, ReadDeviceInformationResponse):
continue
print("Found {} on port {}".format(repr(list(device_info.information.values())), usb_tty_port))
if len(usb_tty_ports) >= 2:
if idx == 0:
return usb_tty_ports[0], usb_tty_ports[1]
else:
return usb_tty_ports[1], usb_tty_ports[0]
else:
return usb_tty_port, None
finally:
modbus_client.close()
print("Error: Failed to infer client/server tty usb ports from {}".format(usb_tty_ports))
sys.exit(1)
client_port, server_port = infer_tracer_and_mt50_tty_ports()
class ClientDelegatingSlaveContext(IModbusSlaveContext):
def __init__(self, delegate_client):
self.delegate_client = delegate_client
def __str__(self):
return "ClientDelegatingSlaveContext"
def reset(self):
pass
def validate(self, fx, address, count=1):
return True
def getValues(self, fx, address, count=1):
start = time.time()
try:
if fx == 0x2:
response = self.delegate_client.execute(ReadDiscreteInputsRequest(address=address, count=count, unit=1))
if not type(response) == ReadDiscreteInputsResponse:
raise Exception("Got unexpected response type for ReadDiscreteInputsRequest: {}".format(response))
values = list(map(response.getBit, range(count)))
return values
elif fx == 0x43:
response = self.delegate_client.execute(ReadSparseRegistersRequest(address=address, count=count, unit=1))
if not type(response) == ReadSparseRegistersResponse:
raise Exception("Got unexpected response type for ReadSparseRegistersRequest: {}".format(response))
return response.registers
else:
_logger.warning("ClientDelegatingSlaveContext.getValues[0x%x] address: 0x%x, count: %d" % (fx, address, count))
raise Exception("Got unexpected function request: 0x%x" % (fx))
finally:
end = time.time()
def setValues(self, fx, address, values):
if fx == 0xf:
response = self.delegate_client.execute(WriteMultipleCoilsRequest(address=address, values=values, unit=1))
elif fx == 0x10:
response = self.delegate_client.execute(WriteMultipleRegistersRequest(address=address, values=values, unit=1))
else:
_logger.warning("ClientDelegatingSlaveContext.setValues[0x%x] address: 0x%x, values: %s" % (fx, address, repr(values)))
def register(self, fc, fx, datablock=None):
raise NotImplementedException("This context only knows how to delegate. No data blocks can be registered.")
class ClientThreadCommand(object):
def __init__(self, request):
self.request = request
self.response = None
self.condition = threading.Semaphore(value=0)
def fulfill(self, response):
self.response = response
self.condition.release()
class ClientThread(threading.Thread, ModbusClientMixin):
def __init__(self, delegate_client):
threading.Thread.__init__(self, daemon=True)
self.delegate_client = delegate_client
self.command_queue = queue.Queue(maxsize=2)
def run(self):
while True:
cmd_start = None
start = time.time()
try:
cmd = self.command_queue.get(block=True, timeout=None)
cmd_start = time.time()
response = self.delegate_client.execute(cmd.request)
if (time.time() - cmd_start) > 0.25:
print("Response for command which took >0.25s:", type(response), response)
cmd.fulfill(response)
except KeyboardInterrupt:
sys.exit(0)
except Exception as ex:
print("Failed to execute command. Unknown error...", traceback.format_exc())
cmd.fulfill(Exception("Failed to execute command. Unknown error...", ex))
finally:
end = time.time()
# if cmd_start:
# print("ClientThread.run; loop iteration time:", end - start, "cmd execution time:", end - cmd_start)
# else:
# print("ClientThread.run; loop iteration time:", end - start)
def execute(self, request):
try:
cmd = ClientThreadCommand(request)
self.command_queue.put(cmd, block=True, timeout=0.25)
if cmd.condition.acquire(blocking=True, timeout=0.25):
if isinstance(cmd.response, BaseException):
raise cmd.response
return cmd.response
print("Could not execute command. Condition timeout...", request)
raise Exception("Could not execute command. Condition timeout...")
except queue.Full:
raise Exception("Could not execute command. Queue full...")
except Exception as ex:
raise Exception("Could not execute command. Unknown error...", ex)
def connect(self):
pass
def close(self):
pass
def is_socket_open(self):
return self.delegate_client.is_socket_open()
def main():
# Go back to allowing EPsolarTracerClient to initialize this once https://github.com/Salamek/epsolar-tracer/pull/3/files is merged
modbus_client = ModbusClient(method='rtu', port=client_port, timeout=0.5, bytesize=8, stopbits=1, baudrate=115200, parity='N')
client = EPsolarTracerClient(serialclient=modbus_client)
client.client.register(ReadSparseRegistersResponse)
if not client.connect():
print('Connection failed to {}'.format(client_port))
sys.exit(1)
device_info = client.read_device_info()
if not isinstance(device_info, ReadDeviceInformationResponse):
print('Could not get device info: {}'.format(device_info))
sys.exit(1)
client_thread = ClientThread(client.client)
client_thread.start()
slave_context = ClientDelegatingSlaveContext(client_thread)
context = ModbusServerContext(slaves=slave_context, single=True)
identity = ModbusDeviceIdentification()
identity.VendorName = device_info.information[0] # 'EPsolar Tech co., Ltd'
identity.ProductCode = device_info.information[1] # 'TriRon3210'
identity.MajorMinorRevision = device_info.information[2] # 'V01.55+V01.22'
def metric_capture_fn():
while True:
try:
time.sleep(5)
client = EPsolarTracerClient(serialclient=client_thread)
# Write code here that accesses the Tracer unit, if desired
except KeyboardInterrupt:
sys.exit(0)
except:
traceback.print_exc()
print("About to start metrics capture loop...")
metric_capture_thread = threading.Thread(target=metric_capture_fn, daemon=True)
metric_capture_thread.start()
print("About to start main server loop...")
if server_port:
try:
StartSerialServer(context, framer=ModbusRtuFramer, identity=identity,
port=server_port, timeout=0.1, bytesize=8, stopbits=1, baudrate=115200, parity='N',
custom_functions=[
LoginRequest,
ReadSparseRegistersRequest,
])
except KeyboardInterrupt:
sys.exit(0)
metric_capture_thread.join()
client_thread.join()
if __name__ == "__main__":
main()
import struct, math
from bitstring import BitArray
from pymodbus.pdu import ModbusRequest, ModbusResponse, ModbusExceptions as merror
from pymodbus.compat import int2byte, byte2int
from pymodbus.utilities import hexlify_packets
from pymodbus.transaction import ModbusRtuFramer
class LoginRequest(ModbusRequest):
function_code = 0x41
@classmethod
def calculateRtuFrameSize(cls, buffer):
''' Calculates the size of the message
:param buffer: A buffer containing the data that have been received.
:returns: The number of bytes in the response.
'''
username_length = byte2int(buffer[3])
return 4 + username_length + 6 + 2
def __init__(self, username=None, pin=None, **kwargs):
''' Initializes a new instance
:param address: The address to start the read from
:param count: The number of registers to read
'''
ModbusRequest.__init__(self, **kwargs)
self.username = username
self.pin = pin
def encode(self):
''' Encodes the request packet
:return: The encoded packet
'''
username_bytes = self.username.encode('utf-8')
return struct.pack('>B{}p6s'.format(len(username_bytes)+1), 0x0, username_bytes, self.pin.encode('utf-8'))
def decode(self, data):
''' Decode a register request packet
:param data: The request to decode
'''
_, self.username, self.pin = struct.unpack('>B5p6s', data)
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "LoginRequest ({}, {})".format(self.username, self.pin)
def execute(self, context):
''' Run a read holding request against a datastore
:param context: The datastore to request from
:returns: An initialized response, exception message otherwise
'''
print("execute:", self)
if self.username == b'user' and self.pin == b'000000':
return LoginSuccessResponse()
return self.doException(merror.IllegalValue)
class LoginSuccessResponse(ModbusResponse):
function_code = 0x41
_rtu_frame_size = 6
def __init__(self, **kwargs):
''' Initializes a new instance
:param values: The values to write to
'''
ModbusResponse.__init__(self, **kwargs)
def encode(self):
''' Encodes the response packet
:returns: The encoded packet
'''
return struct.pack('>H', 0x1)
def decode(self, data):
''' Decode a register response packet
:param data: The request to decode
'''
print("decode:", data)
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "LoginSuccessResponse ()"
class ReadSparseRegistersRequest(ModbusRequest):
function_code = 0x43
_rtu_frame_size = 8
def __init__(self, address=None, count=None, **kwargs):
''' Initializes a new instance
:param address: The address to start the read from
:param count: The number of registers to read
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.count = count
def encode(self):
''' Encodes the request packet
:return: The encoded packet
'''
return struct.pack('>HH', self.address, self.count)
def decode(self, data):
''' Decode a register request packet
:param data: The request to decode
'''
self.address, self.count = struct.unpack('>HH', data)
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "ReadSparseRegistersRequest (%d,%d)" % (self.address, self.count)
def execute(self, context):
''' Run a read holding request against a datastore
:param context: The datastore to request from
:returns: An initialized response, exception message otherwise
'''
if not (1 <= self.count <= 0x7d):
return self.doException(merror.IllegalValue)
values = context.getValues(fx=self.function_code, address=self.address, count=self.count)
return ReadSparseRegistersResponse(values)
class ReadSparseRegistersResponse(ModbusResponse):
function_code = 0x43
_rtu_byte_count_pos = 2
@classmethod
def calculateRtuFrameSize(cls, buffer):
''' Calculates the size of the message
:param buffer: A buffer containing the data that have been received.
:returns: The number of bytes in the response.
'''
byte_count = byte2int(buffer[2])
bit_mask_bytes = math.ceil(byte_count / 16)
bit_mask = BitArray(bytes=buffer[3:3+bit_mask_bytes])
actual_bit_count = bit_mask.count(True) * 2
return 5 + bit_mask_bytes + actual_bit_count
def __init__(self, values=None, **kwargs):
''' Initializes a new instance
:param values: The values to write to
'''
ModbusResponse.__init__(self, **kwargs)
self.registers = values or []
def encode(self):
''' Encodes the response packet
:returns: The encoded packet
'''
byte_count = len(self.registers) * 2
register_count = int(byte_count / 2)
bit_mask_bytes = math.ceil(register_count / 8)
mask_bits = bit_mask_bytes * 8
bit_mask = BitArray(length=mask_bits)
data_bytes = b''
for register_idx, register in enumerate(self.registers):
if register is None:
continue
data_bytes += struct.pack('>H', register)
bit_mask[mask_bits - register_idx - 1] = True
return int2byte(byte_count) + bit_mask.bytes + data_bytes
def decode(self, data):
''' Decode a register response packet
:param data: The request to decode
'''
byte_count = byte2int(data[0])
register_count = int(byte_count / 2)
bit_mask_bytes = math.ceil(register_count / 8)
mask_bits = bit_mask_bytes * 8
bit_mask = BitArray(bytes=data[1:1+bit_mask_bytes])
self.registers = []
i = 1 + bit_mask_bytes
for register_idx in range(register_count):
mask_value = bit_mask[mask_bits - register_idx - 1]
if not mask_value:
self.registers.append(None)
continue
self.registers.append(struct.unpack('>H', data[i:i + 2])[0])
i += 2
def getRegister(self, index):
''' Get the requested register
:param index: The indexed register to retrieve
:returns: The request register
'''
return self.registers[index]
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "ReadSparseRegistersResponse (%d)" % len(self.registers)
MIT License
Copyright (c) 2021 symbioquine
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
[tool.poetry]
name = "epever_modbus_testing"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.8"
pymodbus = "^2.5.2"
epsolar-tracer = "^0.0.17"
bitstring = "^3.1.7"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment