Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Created May 21, 2015 03:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sjlongland/2381673e7fd1c8d53168 to your computer and use it in GitHub Desktop.
Save sjlongland/2381673e7fd1c8d53168 to your computer and use it in GitHub Desktop.
Using pymodbus in TornadoWeb IO loop
#!/usr/bin/env python
'''
Pymodbus Asynchronous Client Examples -- using Tornado
--------------------------------------------------------------------------
The following is an example of how to use the asynchronous modbus
client implementation from pymodbus.
'''
#---------------------------------------------------------------------------#
# import needed libraries
#---------------------------------------------------------------------------#
import tornado
import tornado.platform.twisted
tornado.platform.twisted.install()
from twisted.internet import reactor, protocol
from pymodbus.constants import Defaults
#---------------------------------------------------------------------------#
# choose the requested modbus protocol
#---------------------------------------------------------------------------#
from pymodbus.client.async import ModbusClientProtocol
#from pymodbus.client.async import ModbusUdpClientProtocol
#---------------------------------------------------------------------------#
# configure the client logging
#---------------------------------------------------------------------------#
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
#---------------------------------------------------------------------------#
# helper method to test deferred callbacks
#---------------------------------------------------------------------------#
def dassert(deferred, callback):
def _assertor(value): assert(value)
deferred.addCallback(lambda r: _assertor(callback(r)))
deferred.addErrback(lambda _: _assertor(False))
#---------------------------------------------------------------------------#
# specify slave to query
#---------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
#---------------------------------------------------------------------------#
def exampleRequests(client):
rr = client.read_coils(1, 1, unit=0x02)
#---------------------------------------------------------------------------#
# example requests
#---------------------------------------------------------------------------#
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that unlike the
# synchronous version of the client, the asynchronous version returns
# deferreds which can be thought of as a handle to the callback to send
# the result of the operation. We are handling the result using the
# deferred assert helper(dassert).
#---------------------------------------------------------------------------#
def beginAsynchronousTest(client):
io_loop = tornado.ioloop.IOLoop.current()
def _dump(result):
logging.info('Register values: %s', result.registers)
def _err(result):
logging.error('Error: %s', result)
rq = client.read_holding_registers(0, 4, unit=1)
rq.addCallback(_dump)
rq.addErrback(_err)
#-----------------------------------------------------------------------#
# close the client at some time later
#-----------------------------------------------------------------------#
io_loop.add_timeout(io_loop.time() + 1, client.transport.loseConnection)
io_loop.add_timeout(io_loop.time() + 2, io_loop.stop)
#---------------------------------------------------------------------------#
# extra requests
#---------------------------------------------------------------------------#
# If you are performing a request that is not available in the client
# mixin, you have to perform the request like this instead::
#
# from pymodbus.diag_message import ClearCountersRequest
# from pymodbus.diag_message import ClearCountersResponse
#
# request = ClearCountersRequest()
# response = client.execute(request)
# if isinstance(response, ClearCountersResponse):
# ... do something with the response
#
#---------------------------------------------------------------------------#
#---------------------------------------------------------------------------#
# choose the client you want
#---------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#---------------------------------------------------------------------------#
defer = protocol.ClientCreator(reactor, ModbusClientProtocol
).connectTCP("10.20.30.40", Defaults.Port)
defer.addCallback(beginAsynchronousTest)
tornado.ioloop.IOLoop.current().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment