Created
April 2, 2020 14:32
-
-
Save remceTkedaR/ea180f43e718efe9236cc8dc0b4a064b to your computer and use it in GitHub Desktop.
Python - Raspberry Pi - RS485
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ----- | |
from pymodbus.client.sync import ModbusSerialClient | |
import serial.rs485 | |
ser = serial.rs485.RS485(port='/dev/ttyAMA0', baudrate=1200) | |
ser.rs485_mode = serial.rs485.RS485Settings(rts_level_for_tx=False, | |
rts_level_for_rx=True, | |
delay_before_tx=0.0, | |
delay_before_rx=-0.0) | |
client = ModbusSerialClient(method='rtu') | |
client.socket = ser | |
client.connect() | |
result = client.read_holding_registers(address=0x2000, count=2, | |
unit=1) | |
print(result) | |
print(result.registers) | |
client.close() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# RS485 support | |
# | |
# This file is part of pySerial. https://github.com/pyserial/pyserial | |
# (C) 2015 Chris Liechti <cliechti@gmx.net> | |
# | |
# SPDX-License-Identifier: BSD-3-Clause | |
"""\ | |
The settings for RS485 are stored in a dedicated object that can be applied to | |
serial ports (where supported). | |
NOTE: Some implementations may only support a subset of the settings. | |
""" | |
from __future__ import absolute_import | |
import time | |
import serial | |
class RS485Settings(object): | |
def __init__( | |
self, | |
rts_level_for_tx=True, | |
rts_level_for_rx=False, | |
loopback=False, | |
delay_before_tx=None, | |
delay_before_rx=None): | |
self.rts_level_for_tx = rts_level_for_tx | |
self.rts_level_for_rx = rts_level_for_rx | |
self.loopback = loopback | |
self.delay_before_tx = delay_before_tx | |
self.delay_before_rx = delay_before_rx | |
class RS485(serial.Serial): | |
"""\ | |
A subclass that replaces the write method with one that toggles RTS | |
according to the RS485 settings. | |
NOTE: This may work unreliably on some serial ports (control signals not | |
synchronized or delayed compared to data). Using delays may be | |
unreliable (varying times, larger than expected) as the OS may not | |
support very fine grained delays (no smaller than in the order of | |
tens of milliseconds). | |
NOTE: Some implementations support this natively. Better performance | |
can be expected when the native version is used. | |
NOTE: The loopback property is ignored by this implementation. The actual | |
behavior depends on the used hardware. | |
Usage: | |
ser = RS485(...) | |
ser.rs485_mode = RS485Settings(...) | |
ser.write(b'hello') | |
""" | |
def __init__(self, *args, **kwargs): | |
super(RS485, self).__init__(*args, **kwargs) | |
self._alternate_rs485_settings = None | |
def write(self, b): | |
"""Write to port, controlling RTS before and after transmitting.""" | |
if self._alternate_rs485_settings is not None: | |
# apply level for TX and optional delay | |
self.setRTS(self._alternate_rs485_settings.rts_level_for_tx) | |
if self._alternate_rs485_settings.delay_before_tx is not None: | |
time.sleep(self._alternate_rs485_settings.delay_before_tx) | |
# write and wait for data to be written | |
super(RS485, self).write(b) | |
super(RS485, self).flush() | |
# optional delay and apply level for RX | |
if self._alternate_rs485_settings.delay_before_rx is not None: | |
time.sleep(self._alternate_rs485_settings.delay_before_rx) | |
self.setRTS(self._alternate_rs485_settings.rts_level_for_rx) | |
else: | |
super(RS485, self).write(b) | |
# redirect where the property stores the settings so that underlying Serial | |
# instance does not see them | |
@property | |
def rs485_mode(self): | |
"""\ | |
Enable RS485 mode and apply new settings, set to None to disable. | |
See serial.rs485.RS485Settings for more info about the value. | |
""" | |
return self._alternate_rs485_settings | |
@rs485_mode.setter | |
def rs485_mode(self, rs485_settings): | |
self._alternate_rs485_settings = rs485_settings |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial.rs485 | |
ser = serial.rs485.RS485(port='/dev/ttyAMA0', baudrate=2400) | |
ser.rs485_mode = serial.rs485.RS485Settings(False, True) | |
ser.write('a test'.encode('utf-8')) | |
while True: | |
c = ser.read(1) | |
ser.write(c) | |
print(c, end='') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# This file is part of pySerial - Cross platform serial port support for Python | |
# (C) 2015 Chris Liechti <cliechti@gmx.net> | |
# | |
# SPDX-License-Identifier: BSD-3-Clause | |
"""\ | |
Test RS485 related functionality. | |
""" | |
import unittest | |
import serial | |
import serial.rs485 | |
# on which port should the tests be performed: | |
PORT = 'loop://' | |
class Test_RS485_settings(unittest.TestCase): | |
"""Test RS485 related functionality""" | |
def setUp(self): | |
# create a closed serial port | |
self.s = serial.serial_for_url(PORT, do_not_open=True) | |
def tearDown(self): | |
self.s.close() | |
def test_enable_RS485(self): | |
# XXX open() port - but will result in fail for most HW... | |
#~ self.s.open() | |
self.assertEqual(self.s._rs485_mode, None, 'RS485 is disabled by default') | |
self.assertEqual(self.s.rs485_mode, None, 'RS485 is disabled by default') | |
self.s.rs485_mode = serial.rs485.RS485Settings() | |
self.assertTrue(self.s._rs485_mode is not None, 'RS485 is enabled') | |
self.assertTrue(self.s.rs485_mode is not None, 'RS485 is enabled') | |
self.s.rs485_mode = None | |
self.assertEqual(self.s._rs485_mode, None, 'RS485 is disabled again') | |
self.assertEqual(self.s.rs485_mode, None, 'RS485 is disabled again') | |
class Test_RS485_class(unittest.TestCase): | |
"""Test RS485 class""" | |
def setUp(self): | |
if not isinstance(serial.serial_for_url(PORT), serial.Serial): | |
raise unittest.SkipTest("RS485 test only compatible with real serial port") | |
self.s = serial.rs485.RS485(PORT, timeout=1) | |
def tearDown(self): | |
self.s.close() | |
def test_RS485_class(self): | |
self.s.rs485_mode = serial.rs485.RS485Settings() | |
self.s.write(b'hello') | |
self.assertEqual(self.s.read(5), b'hello') | |
if __name__ == '__main__': | |
import sys | |
sys.stdout.write(__doc__) | |
if len(sys.argv) > 1: | |
PORT = sys.argv[1] | |
sys.stdout.write("Testing port: {!r}\n".format(PORT)) | |
sys.argv[1:] = ['-v'] | |
# When this module is executed from the command-line, it runs all its tests | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment