~ubuntu-branches/ubuntu/precise/pymodbus/precise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
import unittest
from pymodbus.factory import ServerDecoder, ClientDecoder
from pymodbus.exceptions import ModbusException

def _raise_exception(_):
    raise ModbusException('something')

class SimpleFactoryTest(unittest.TestCase):
    '''
    This is the unittest for the pymod.exceptions module
    '''

    def setUp(self):
        ''' Initializes the test environment '''
        self.client  = ClientDecoder()
        self.server  = ServerDecoder()
        self.request = (
                (0x01, '\x01\x00\x01\x00\x01'),                       # read coils
                (0x02, '\x02\x00\x01\x00\x01'),                       # read discrete inputs
                (0x03, '\x03\x00\x01\x00\x01'),                       # read holding registers
                (0x04, '\x04\x00\x01\x00\x01'),                       # read input registers
                (0x05, '\x05\x00\x01\x00\x01'),                       # write single coil
                (0x06, '\x06\x00\x01\x00\x01'),                       # write single register
                (0x07, '\x07'),                                       # read exception status
                (0x08, '\x08\x00\x00\x00\x00'),                       # read diagnostic
                (0x0b, '\x0b'),                                       # get comm event counters
                (0x0c, '\x0c'),                                       # get comm event log
                (0x0f, '\x0f\x00\x01\x00\x08\x01\x00\xff'),           # write multiple coils
                (0x10, '\x10\x00\x01\x00\x02\x04\0xff\xff'),          # write multiple registers
                (0x11, '\x11'),                                       # report slave id
                (0x14, '\x14\x0e\x06\x00\x04\x00\x01\x00\x02' \
                       '\x06\x00\x03\x00\x09\x00\x02'),               # read file record
                (0x15, '\x15\x0d\x06\x00\x04\x00\x07\x00\x03' \
                       '\x06\xaf\x04\xbe\x10\x0d'),                   # write file record
                (0x16, '\x16\x00\x01\x00\xff\xff\x00'),               # mask write register
                (0x17, '\x17\x00\x01\x00\x01\x00\x01\x00\x01\x02\x12\x34'),# read/write multiple registers
                (0x18, '\x18\x00\x01'),                               # read fifo queue
                (0x2b, '\x2b\x0e\x01\x00'),                           # read device identification
        )

        self.response = (
                (0x01, '\x01\x01\x01'),                               # read coils
                (0x02, '\x02\x01\x01'),                               # read discrete inputs
                (0x03, '\x03\x02\x01\x01'),                           # read holding registers
                (0x04, '\x04\x02\x01\x01'),                           # read input registers
                (0x05, '\x05\x00\x01\x00\x01'),                       # write single coil
                (0x06, '\x06\x00\x01\x00\x01'),                       # write single register
                (0x07, '\x07\x00'),                                   # read exception status
                (0x08, '\x08\x00\x00\x00\x00'),                       # read diagnostic
                (0x0b, '\x0b\x00\x00\x00\x00'),                       # get comm event counters
                (0x0c, '\x0c\x08\x00\x00\x01\x08\x01\x21\x20\x00'),   # get comm event log
                (0x0f, '\x0f\x00\x01\x00\x08'),                       # write multiple coils
                (0x10, '\x10\x00\x01\x00\x02'),                       # write multiple registers
                (0x11, '\x11\x03\x05\x01\x54'),                       # report slave id (device specific)
                (0x14, '\x14\x0c\x05\x06\x0d\xfe\x00\x20\x05' \
                       '\x06\x33\xcd\x00\x40'),                       # read file record
                (0x15, '\x15\x0d\x06\x00\x04\x00\x07\x00\x03' \
                       '\x06\xaf\x04\xbe\x10\x0d'),                   # write file record
                (0x16, '\x16\x00\x01\x00\xff\xff\x00'),               # mask write register
                (0x17, '\x17\x02\x12\x34'),                           # read/write multiple registers
                (0x18, '\x18\x00\x01\x00\x01\x00\x00'),               # read fifo queue
                (0x2b, '\x2b\x0e\x01\x01\x00\x00\x01\x00\x01\x77'),   # read device identification
        )

        self.bad = (
                (0x80, '\x80\x00\x00\x00'),                           # Unknown Function
                (0x81, '\x81\x00\x00\x00'),                           # error message
        )

    def tearDown(self):
        ''' Cleans up the test environment '''
        del self.bad
        del self.request
        del self.response

    def testResponseLookup(self):
        ''' Test a working response factory lookup '''
        for func, _ in self.response:
            response = self.client.lookupPduClass(func)
            self.assertNotEqual(response, None)

    def testRequestLookup(self):
        ''' Test a working request factory lookup '''
        for func, _ in self.request:
            request = self.client.lookupPduClass(func)
            self.assertNotEqual(request, None)

    def testResponseWorking(self):
        ''' Test a working response factory decoders '''
        for func, msg in self.response:
            try:
                self.client.decode(msg)
            except ModbusException:
                self.fail("Failed to Decode Response Message", func)

    def testResponseErrors(self):
        ''' Test a response factory decoder exceptions '''
        self.assertRaises(ModbusException, self.client._helper, self.bad[0][1])
        self.assertEqual(self.client.decode(self.bad[1][1]).function_code, self.bad[1][0],
                "Failed to decode error PDU")

    def testRequestsWorking(self):
        ''' Test a working request factory decoders '''
        for func, msg in self.request:
            try:
                self.server.decode(msg)
            except ModbusException:
                self.fail("Failed to Decode Request Message", func)

    def testClientFactoryFails(self):
        ''' Tests that a client factory will fail to decode a bad message '''
        self.client._helper = _raise_exception
        actual = self.client.decode(None)
        self.assertEquals(actual, None)

    def testServerFactoryFails(self):
        ''' Tests that a server factory will fail to decode a bad message '''
        self.server._helper = _raise_exception
        actual = self.server.decode(None)
        self.assertEquals(actual, None)

#---------------------------------------------------------------------------#
# I don't actually know what is supposed to be returned here, I assume that
# since the high bit is set, it will simply echo the resulting message
#---------------------------------------------------------------------------#
    def testRequestErrors(self):
        ''' Test a request factory decoder exceptions '''
        for func, msg in self.bad:
            result = self.server.decode(msg)
            self.assertEqual(result.ErrorCode, 1,
                    "Failed to decode invalid requests")
            self.assertEqual(result.execute(None).function_code, func,
                    "Failed to create correct response message")

#---------------------------------------------------------------------------#
# Main
#---------------------------------------------------------------------------#
if __name__ == "__main__":
    unittest.main()