~ampelbein/ubuntu/natty/pyserial/lp-715766

« back to all changes in this revision

Viewing changes to test/test.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2011-01-25 07:59:17 UTC
  • mfrom: (3.1.5 experimental)
  • Revision ID: james.westby@ubuntu.com-20110125075917-m132a8pxfff5a7sf
Tags: 2.5-1
* New upstream version. Closes: #520618.
* Build a python3-serial package.
* Don't use string exception in miniterm.py. Closes: #585328.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/env python
 
2
# Python Serial Port Extension for Win32, Linux, BSD, Jython
 
3
# see __init__.py
 
4
#
 
5
# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
 
6
# this is distributed under a free software license, see license.txt
 
7
 
 
8
"""\
 
9
Some tests for the serial module.
 
10
Part of pyserial (http://pyserial.sf.net)  (C)2001-2009 cliechti@gmx.net
 
11
 
 
12
Intended to be run on different platforms, to ensure portability of
 
13
the code.
 
14
 
 
15
For all these tests a simple hardware is required.
 
16
Loopback HW adapter:
 
17
Shortcut these pin pairs:
 
18
 TX  <-> RX
 
19
 RTS <-> CTS
 
20
 DTR <-> DSR
 
21
 
 
22
On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
 
23
"""
 
24
 
 
25
import unittest
 
26
import threading
 
27
import time
 
28
import sys
 
29
import serial
 
30
 
 
31
# on which port should the tests be performed:
 
32
PORT = 0
 
33
 
 
34
if sys.version_info >= (3, 0):
 
35
    def data(string):
 
36
        return bytes(string, 'latin1')
 
37
    bytes_0to255 = bytes(range(256))
 
38
else:
 
39
    def data(string): return string
 
40
    bytes_0to255 = ''.join([chr(x) for x in range(256)])
 
41
 
 
42
 
 
43
def segments(data, size=16):
 
44
    for a in range(0, len(data), size):
 
45
        yield data[a:a+size]
 
46
 
 
47
 
 
48
class Test4_Nonblocking(unittest.TestCase):
 
49
    """Test with timeouts"""
 
50
    timeout = 0
 
51
 
 
52
    def setUp(self):
 
53
        self.s = serial.serial_for_url(PORT, timeout=self.timeout)
 
54
 
 
55
    def tearDown(self):
 
56
        self.s.close()
 
57
 
 
58
    def test0_Messy(self):
 
59
        """NonBlocking (timeout=0)"""
 
60
        # this is only here to write out the message in verbose mode
 
61
        # because Test3 and Test4 print the same messages
 
62
 
 
63
    def test1_ReadEmpty(self):
 
64
        """timeout: After port open, the input buffer must be empty"""
 
65
        self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer")
 
66
 
 
67
    def test2_Loopback(self):
 
68
        """timeout: each sent character should return (binary test).
 
69
           this is also a test for the binary capability of a port."""
 
70
        for block in segments(bytes_0to255):
 
71
            length = len(block)
 
72
            self.s.write(block)
 
73
            # there might be a small delay until the character is ready (especially on win32)
 
74
            time.sleep(0.05)
 
75
            self.failUnlessEqual(self.s.inWaiting(), length, "expected exactly %d character for inWainting()" % length)
 
76
            self.failUnlessEqual(self.s.read(length), block)#, "expected a %r which was written before" % block)
 
77
        self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
 
78
 
 
79
    def test2_LoopbackTimeout(self):
 
80
        """timeout: test the timeout/immediate return.
 
81
        partial results should be returned."""
 
82
        self.s.write(data("HELLO"))
 
83
        time.sleep(0.1)    # there might be a small delay until the character is ready (especially on win32 and rfc2217)
 
84
        # read more characters as are available to run in the timeout
 
85
        self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before")
 
86
        self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
 
87
 
 
88
 
 
89
class Test3_Timeout(Test4_Nonblocking):
 
90
    """Same tests as the NonBlocking ones but this time with timeout"""
 
91
    timeout = 1
 
92
 
 
93
    def test0_Messy(self):
 
94
        """Blocking (timeout=1)"""
 
95
        # this is only here to write out the message in verbose mode
 
96
        # because Test3 and Test4 print the same messages
 
97
 
 
98
 
 
99
class SendEvent(threading.Thread):
 
100
    def __init__(self, serial, delay=3):
 
101
        threading.Thread.__init__(self)
 
102
        self.serial = serial
 
103
        self.delay = delay
 
104
        self.x = threading.Event()
 
105
        self.stopped = 0
 
106
        self.start()
 
107
 
 
108
    def run(self):
 
109
        time.sleep(self.delay)
 
110
        if not self.stopped:
 
111
            self.serial.write(data("E"))
 
112
            self.serial.flush()
 
113
        self.x.set()
 
114
 
 
115
    def isSet(self):
 
116
        return self.x.isSet()
 
117
 
 
118
    def stop(self):
 
119
        self.stopped = 1
 
120
        self.x.wait()
 
121
 
 
122
class Test1_Forever(unittest.TestCase):
 
123
    """Tests a port with no timeout. These tests require that a
 
124
    character is sent after some time to stop the test, this is done
 
125
    through the SendEvent class and the Loopback HW."""
 
126
    def setUp(self):
 
127
        self.s = serial.serial_for_url(PORT, timeout=None)
 
128
        self.event = SendEvent(self.s)
 
129
 
 
130
    def tearDown(self):
 
131
        self.event.stop()
 
132
        self.s.close()
 
133
 
 
134
    def test2_ReadEmpty(self):
 
135
        """no timeout: after port open, the input buffer must be empty (read).
 
136
        a character is sent after some time to terminate the test (SendEvent)."""
 
137
        c = self.s.read(1)
 
138
        if not (self.event.isSet() and c == data('E')):
 
139
            self.fail("expected marker")
 
140
 
 
141
 
 
142
class Test2_Forever(unittest.TestCase):
 
143
    """Tests a port with no timeout"""
 
144
    def setUp(self):
 
145
        self.s = serial.serial_for_url(PORT, timeout=None)
 
146
 
 
147
    def tearDown(self):
 
148
        self.s.close()
 
149
 
 
150
    def test1_inWaitingEmpty(self):
 
151
        """no timeout: after port open, the input buffer must be empty (inWaiting)"""
 
152
        self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer")
 
153
 
 
154
    def test2_Loopback(self):
 
155
        """no timeout: each sent character should return (binary test).
 
156
           this is also a test for the binary capability of a port."""
 
157
        for block in segments(bytes_0to255):
 
158
            length = len(block)
 
159
            self.s.write(block)
 
160
            # there might be a small delay until the character is ready (especially on win32 and rfc2217)
 
161
            time.sleep(0.05)
 
162
            self.failUnlessEqual(self.s.inWaiting(), length)#, "expected exactly %d character for inWainting()" % length)
 
163
            self.failUnlessEqual(self.s.read(length), block) #, "expected %r which was written before" % block)
 
164
        self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer after all sent chars are read")
 
165
 
 
166
 
 
167
class Test0_DataWires(unittest.TestCase):
 
168
    """Test modem control lines"""
 
169
    def setUp(self):
 
170
        self.s = serial.serial_for_url(PORT)
 
171
 
 
172
    def tearDown(self):
 
173
        self.s.close()
 
174
 
 
175
    def test1_RTS(self):
 
176
        """Test RTS/CTS"""
 
177
        self.s.setRTS(0)
 
178
        time.sleep(1.1)
 
179
        self.failUnless(not self.s.getCTS(), "CTS -> 0")
 
180
        self.s.setRTS(1)
 
181
        time.sleep(1.1)
 
182
        self.failUnless(self.s.getCTS(), "CTS -> 1")
 
183
 
 
184
    def test2_DTR(self):
 
185
        """Test DTR/DSR"""
 
186
        self.s.setDTR(0)
 
187
        time.sleep(1.1)
 
188
        self.failUnless(not self.s.getDSR(), "DSR -> 0")
 
189
        self.s.setDTR(1)
 
190
        time.sleep(1.1)
 
191
        self.failUnless(self.s.getDSR(), "DSR -> 1")
 
192
 
 
193
    def test3_RI(self):
 
194
        """Test RI"""
 
195
        self.failUnless(not self.s.getRI(), "RI -> 0")
 
196
 
 
197
 
 
198
class Test_MoreTimeouts(unittest.TestCase):
 
199
    """Test with timeouts"""
 
200
    def setUp(self):
 
201
        # create an closed serial port
 
202
        self.s = serial.serial_for_url(PORT, do_not_open=True)
 
203
 
 
204
    def tearDown(self):
 
205
        self.s.close()
 
206
 
 
207
    def test_WriteTimeout(self):
 
208
        """Test write() timeout."""
 
209
        # use xonxoff setting and the loop-back adapter to switch traffic on hold
 
210
        self.s.port = PORT
 
211
        self.s.writeTimeout = 1
 
212
        self.s.xonxoff = 1
 
213
        self.s.open()
 
214
        self.s.write(serial.XOFF)
 
215
        time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
 
216
        t1 = time.time()
 
217
        self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200))
 
218
        t2 = time.time()
 
219
        self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))
 
220
 
 
221
 
 
222
if __name__ == '__main__':
 
223
    import sys
 
224
    sys.stdout.write(__doc__)
 
225
    if len(sys.argv) > 1:
 
226
        PORT = sys.argv[1]
 
227
    sys.stdout.write("Testing port: %r\n" % PORT)
 
228
    sys.argv[1:] = ['-v']
 
229
    # When this module is executed from the command-line, it runs all its tests
 
230
    unittest.main()