2
Bit Writing Request/Response
3
------------------------------
5
TODO write mask request/response
8
from pymodbus.constants import ModbusStatus
9
from pymodbus.pdu import ModbusRequest
10
from pymodbus.pdu import ModbusResponse
11
from pymodbus.pdu import ModbusExceptions as merror
12
from pymodbus.exceptions import ParameterException
13
from pymodbus.utilities import pack_bitstring, unpack_bitstring
15
#---------------------------------------------------------------------------#
17
#---------------------------------------------------------------------------#
18
# These are defined in the spec to turn a coil on/off
19
#---------------------------------------------------------------------------#
20
_turn_coil_on = struct.pack(">H", ModbusStatus.On)
21
_turn_coil_off = struct.pack(">H", ModbusStatus.Off)
23
class WriteSingleCoilRequest(ModbusRequest):
25
This function code is used to write a single output to either ON or OFF
28
The requested ON/OFF state is specified by a constant in the request
29
data field. A value of FF 00 hex requests the output to be ON. A value
30
of 00 00 requests it to be OFF. All other values are illegal and will
31
not affect the output.
33
The Request PDU specifies the address of the coil to be forced. Coils
34
are addressed starting at zero. Therefore coil numbered 1 is addressed
35
as 0. The requested ON/OFF state is specified by a constant in the Coil
36
Value field. A value of 0XFF00 requests the coil to be ON. A value of
37
0X0000 requests the coil to be off. All other values are illegal and
38
will not affect the coil.
43
def __init__(self, address=None, value=None, **kwargs):
44
''' Initializes a new instance
46
:param address: The variable address to write
47
:param value: The value to write at address
49
ModbusRequest.__init__(self, **kwargs)
50
self.address = address
51
self.value = True if value else False
54
''' Encodes write coil request
56
:returns: The byte encoded message
58
result = struct.pack('>H', self.address)
59
result += _turn_coil_on if self.value else _turn_coil_off
62
def decode(self, data):
63
''' Decodes a write coil request
65
:param data: The packet data to decode
67
self.address, value = struct.unpack('>HH', data)
68
self.value = True if value == ModbusStatus.On else False
70
def execute(self, context):
71
''' Run a write coil request against a datastore
73
:param context: The datastore to request from
74
:returns: The populated response or exception message
76
#if self.value not in [ModbusStatus.Off, ModbusStatus.On]:
77
# return self.doException(merror.IllegalValue)
78
if not context.validate(self.function_code, self.address, 1):
79
return self.doException(merror.IllegalAddress)
81
context.setValues(self.function_code, self.address, self.value)
82
values = context.getValues(self.function_code, self.address, 1)
83
return WriteSingleCoilResponse(self.address, values[0])
86
''' Returns a string representation of the instance
88
:return: A string representation of the instance
90
return "WriteCoilRequest(%d, %s) => " % (self.address, self.value)
92
class WriteSingleCoilResponse(ModbusResponse):
94
The normal response is an echo of the request, returned after the coil
95
state has been written.
100
def __init__(self, address=None, value=None, **kwargs):
101
''' Initializes a new instance
103
:param address: The variable address written to
104
:param value: The value written at address
106
ModbusResponse.__init__(self, **kwargs)
107
self.address = address
111
''' Encodes write coil response
113
:return: The byte encoded message
115
result = struct.pack('>H', self.address)
116
result += _turn_coil_on if self.value else _turn_coil_off
119
def decode(self, data):
120
''' Decodes a write coil response
122
:param data: The packet data to decode
124
self.address, value = struct.unpack('>HH', data)
125
self.value = (value == ModbusStatus.On)
128
''' Returns a string representation of the instance
130
:returns: A string representation of the instance
132
return "WriteCoilResponse(%d) => %d" % (self.address, self.value)
134
class WriteMultipleCoilsRequest(ModbusRequest):
136
"This function code is used to force each coil in a sequence of coils to
137
either ON or OFF in a remote device. The Request PDU specifies the coil
138
references to be forced. Coils are addressed starting at zero. Therefore
139
coil numbered 1 is addressed as 0.
141
The requested ON/OFF states are specified by contents of the request
142
data field. A logical '1' in a bit position of the field requests the
143
corresponding output to be ON. A logical '0' requests it to be OFF."
146
_rtu_byte_count_pos = 6
148
def __init__(self, address=None, values=None, **kwargs):
149
''' Initializes a new instance
151
:param address: The starting request address
152
:param values: The values to write
154
ModbusRequest.__init__(self, **kwargs)
155
self.address = address
156
if not values: values = []
157
elif not hasattr(values, '__iter__'): values = [values]
159
self.byte_count = (len(self.values) + 7) / 8
162
''' Encodes write coils request
164
:returns: The byte encoded message
166
count = len(self.values)
167
self.byte_count = (count + 7) / 8
168
packet = struct.pack('>HHB', self.address, count, self.byte_count)
169
packet += pack_bitstring(self.values)
172
def decode(self, data):
173
''' Decodes a write coils request
175
:param data: The packet data to decode
177
self.address, count, self.byte_count = struct.unpack('>HHB', data[0:5])
178
values = unpack_bitstring(data[5:])
179
self.values = values[:count]
181
def execute(self, context):
182
''' Run a write coils request against a datastore
184
:param context: The datastore to request from
185
:returns: The populated response or exception message
187
count = len(self.values)
188
if not (1 <= count <= 0x07b0):
189
return self.doException(merror.IllegalValue)
190
if (self.byte_count != (count + 7) / 8):
191
return self.doException(merror.IllegalValue)
192
if not context.validate(self.function_code, self.address, count):
193
return self.doException(merror.IllegalAddress)
195
context.setValues(self.function_code, self.address, self.values)
196
return WriteMultipleCoilsResponse(self.address, count)
199
''' Returns a string representation of the instance
201
:returns: A string representation of the instance
203
params = (self.address, len(self.values))
204
return "WriteNCoilRequest (%d) => %d " % params
206
class WriteMultipleCoilsResponse(ModbusResponse):
208
The normal response returns the function code, starting address, and
209
quantity of coils forced.
214
def __init__(self, address=None, count=None, **kwargs):
215
''' Initializes a new instance
217
:param address: The starting variable address written to
218
:param count: The number of values written
220
ModbusResponse.__init__(self, **kwargs)
221
self.address = address
225
''' Encodes write coils response
227
:returns: The byte encoded message
229
return struct.pack('>HH', self.address, self.count)
231
def decode(self, data):
232
''' Decodes a write coils response
234
:param data: The packet data to decode
236
self.address, self.count = struct.unpack('>HH', data)
239
''' Returns a string representation of the instance
241
:returns: A string representation of the instance
243
return "WriteNCoilResponse(%d, %d)" % (self.address, self.count)
245
#---------------------------------------------------------------------------#
247
#---------------------------------------------------------------------------#
249
"WriteSingleCoilRequest", "WriteSingleCoilResponse",
250
"WriteMultipleCoilsRequest", "WriteMultipleCoilsResponse",