3
Pymodbus Asynchronous Client Examples
4
--------------------------------------------------------------------------
6
The following is an example of how to use the asynchronous modbus
7
client implementation from pymodbus.
9
#---------------------------------------------------------------------------#
10
# import needed libraries
11
#---------------------------------------------------------------------------#
12
from twisted.internet import reactor, protocol
13
from pymodbus.constants import Defaults
15
#---------------------------------------------------------------------------#
16
# choose the requested modbus protocol
17
#---------------------------------------------------------------------------#
18
from pymodbus.client.async import ModbusClientProtocol
19
#from pymodbus.client.async import ModbusUdpClientProtocol
21
#---------------------------------------------------------------------------#
22
# configure the client logging
23
#---------------------------------------------------------------------------#
26
log = logging.getLogger()
27
log.setLevel(logging.DEBUG)
29
#---------------------------------------------------------------------------#
30
# helper method to test deferred callbacks
31
#---------------------------------------------------------------------------#
32
def dassert(deferred, callback):
33
def _assertor(value): assert(value)
34
deferred.addCallback(lambda r: _assertor(callback(r)))
35
deferred.addErrback(lambda _: _assertor(False))
37
#---------------------------------------------------------------------------#
39
#---------------------------------------------------------------------------#
40
# simply call the methods that you would like to use. An example session
41
# is displayed below along with some assert checks. Note that unlike the
42
# synchronous version of the client, the asynchronous version returns
43
# deferreds which can be thought of as a handle to the callback to send
44
# the result of the operation. We are handling the result using the
45
# deferred assert helper(dassert).
46
#---------------------------------------------------------------------------#
47
def beginAsynchronousTest(client):
48
rq = client.write_coil(1, True)
49
rr = client.read_coils(1,1)
50
dassert(rq, lambda r: r.function_code < 0x80) # test that we are not an error
51
dassert(rr, lambda r: r.bits[0] == True) # test the expected value
53
rq = client.write_coils(1, [True]*8)
54
rr = client.read_coils(1,8)
55
dassert(rq, lambda r: r.function_code < 0x80) # test that we are not an error
56
dassert(rr, lambda r: r.bits == [True]*8) # test the expected value
58
rq = client.write_coils(1, [False]*8)
59
rr = client.read_discrete_inputs(1,8)
60
dassert(rq, lambda r: r.function_code < 0x80) # test that we are not an error
61
dassert(rr, lambda r: r.bits == [True]*8) # test the expected value
63
rq = client.write_register(1, 10)
64
rr = client.read_holding_registers(1,1)
65
dassert(rq, lambda r: r.function_code < 0x80) # test that we are not an error
66
dassert(rr, lambda r: r.registers[0] == 10) # test the expected value
68
rq = client.write_registers(1, [10]*8)
69
rr = client.read_input_registers(1,8)
70
dassert(rq, lambda r: r.function_code < 0x80) # test that we are not an error
71
dassert(rr, lambda r: r.registers == [17]*8) # test the expected value
77
'write_registers': [20]*8,
79
rq = client.readwrite_registers(**arguments)
80
rr = client.read_input_registers(1,8)
81
dassert(rq, lambda r: r.registers == [20]*8) # test the expected value
82
dassert(rr, lambda r: r.registers == [17]*8) # test the expected value
84
#-----------------------------------------------------------------------#
85
# close the client at some time later
86
#-----------------------------------------------------------------------#
87
reactor.callLater(1, client.transport.loseConnection)
88
reactor.callLater(2, reactor.stop)
90
#---------------------------------------------------------------------------#
91
# choose the client you want
92
#---------------------------------------------------------------------------#
93
# make sure to start an implementation to hit against. For this
94
# you can use an existing device, the reference implementation in the tools
95
# directory, or start a pymodbus server.
96
#---------------------------------------------------------------------------#
97
defer = protocol.ClientCreator(reactor, ModbusClientProtocol
98
).connectTCP("localhost", Defaults.Port)
99
defer.addCallback(beginAsynchronousTest)