Skip to content

Instantly share code, notes, and snippets.

@piotrmaslanka
Last active September 20, 2020 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piotrmaslanka/6b47a42618834662ef88c6ebadd53e94 to your computer and use it in GitHub Desktop.
Save piotrmaslanka/6b47a42618834662ef88c6ebadd53e94 to your computer and use it in GitHub Desktop.
[Python3] A very simple MODBUS TCP server supporting only 3: Read Holding Registers and sending zeros.
import socket
import struct
import threading
class SocketThread(threading.Thread):
def __init__(self, sock):
super().__init__()
self.socket = socket
def recv_all(self, length: int) -> bytearray:
data = bytearray()
while len(data) < length:
s = self.socket.recv(length - len(data))
if not s:
raise ValueError('End of connection')
data += s
return data
def run(self):
try:
self._run()
finally:
self.socket.close()
def _run(self):
while True:
t_id = self.recv_all(2) # Grab the transaction ID
self.recv_all(2) # Skip the protocol
length, = struct.unpack('>H', self.recv_all(2)) # Get packet length
packet = self.recv_all(length)
code = packet[1]
if code != 3:
raise ValueError(f'Unknown code {code}')
addr, code, starting_reg, amount_regs = struct.unpack('>BBHH', packet)
resp = struct.pack('>BBB', addr, code, amount_regs*2) + struct.pack('>' + ('H'*amount_regs), *([0]*amount_regs))
response = t_id + struct.pack('>HH', 0, len(resp)) + resp
self.socket.sendall(response)
print(f'Sent back {repr(response)}')
if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', 502))
s.listen(20)
while True:
client, addr = s.accept()
print(f'Accepted {addr}')
st = SocketThread(client)
st.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment