Using pymodbus in TornadoWeb IO loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Pymodbus Asynchronous Client Examples -- using Tornado | |
-------------------------------------------------------------------------- | |
The following is an example of how to use the asynchronous modbus | |
client implementation from pymodbus. | |
''' | |
#---------------------------------------------------------------------------# | |
# import needed libraries | |
#---------------------------------------------------------------------------# | |
import tornado | |
import tornado.platform.twisted | |
tornado.platform.twisted.install() | |
from twisted.internet import reactor, protocol | |
from pymodbus.constants import Defaults | |
#---------------------------------------------------------------------------# | |
# choose the requested modbus protocol | |
#---------------------------------------------------------------------------# | |
from pymodbus.client.async import ModbusClientProtocol | |
#from pymodbus.client.async import ModbusUdpClientProtocol | |
#---------------------------------------------------------------------------# | |
# configure the client logging | |
#---------------------------------------------------------------------------# | |
import logging | |
logging.basicConfig() | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
#---------------------------------------------------------------------------# | |
# helper method to test deferred callbacks | |
#---------------------------------------------------------------------------# | |
def dassert(deferred, callback): | |
def _assertor(value): assert(value) | |
deferred.addCallback(lambda r: _assertor(callback(r))) | |
deferred.addErrback(lambda _: _assertor(False)) | |
#---------------------------------------------------------------------------# | |
# specify slave to query | |
#---------------------------------------------------------------------------# | |
# The slave to query is specified in an optional parameter for each | |
# individual request. This can be done by specifying the `unit` parameter | |
# which defaults to `0x00` | |
#---------------------------------------------------------------------------# | |
def exampleRequests(client): | |
rr = client.read_coils(1, 1, unit=0x02) | |
#---------------------------------------------------------------------------# | |
# example requests | |
#---------------------------------------------------------------------------# | |
# simply call the methods that you would like to use. An example session | |
# is displayed below along with some assert checks. Note that unlike the | |
# synchronous version of the client, the asynchronous version returns | |
# deferreds which can be thought of as a handle to the callback to send | |
# the result of the operation. We are handling the result using the | |
# deferred assert helper(dassert). | |
#---------------------------------------------------------------------------# | |
def beginAsynchronousTest(client): | |
io_loop = tornado.ioloop.IOLoop.current() | |
def _dump(result): | |
logging.info('Register values: %s', result.registers) | |
def _err(result): | |
logging.error('Error: %s', result) | |
rq = client.read_holding_registers(0, 4, unit=1) | |
rq.addCallback(_dump) | |
rq.addErrback(_err) | |
#-----------------------------------------------------------------------# | |
# close the client at some time later | |
#-----------------------------------------------------------------------# | |
io_loop.add_timeout(io_loop.time() + 1, client.transport.loseConnection) | |
io_loop.add_timeout(io_loop.time() + 2, io_loop.stop) | |
#---------------------------------------------------------------------------# | |
# extra requests | |
#---------------------------------------------------------------------------# | |
# If you are performing a request that is not available in the client | |
# mixin, you have to perform the request like this instead:: | |
# | |
# from pymodbus.diag_message import ClearCountersRequest | |
# from pymodbus.diag_message import ClearCountersResponse | |
# | |
# request = ClearCountersRequest() | |
# response = client.execute(request) | |
# if isinstance(response, ClearCountersResponse): | |
# ... do something with the response | |
# | |
#---------------------------------------------------------------------------# | |
#---------------------------------------------------------------------------# | |
# choose the client you want | |
#---------------------------------------------------------------------------# | |
# make sure to start an implementation to hit against. For this | |
# you can use an existing device, the reference implementation in the tools | |
# directory, or start a pymodbus server. | |
#---------------------------------------------------------------------------# | |
defer = protocol.ClientCreator(reactor, ModbusClientProtocol | |
).connectTCP("10.20.30.40", Defaults.Port) | |
defer.addCallback(beginAsynchronousTest) | |
tornado.ioloop.IOLoop.current().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment