~ubuntu-branches/ubuntu/quantal/pyserial/quantal

« back to all changes in this revision

Viewing changes to pyserial-2.0/examples/enhancedserial.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2004-08-29 14:49:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20040829144957-moa3k4yx4qte5qth
Tags: 2.1-1
New upstream version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
"""Enhanced Serial Port class
3
 
part of pyserial (http://pyserial.sf.net)  (C)2002 cliechti@gmx.net
4
 
 
5
 
another implementation of the readline and readlines method.
6
 
this one should be more efficient because a bunch of characters are read
7
 
on each access, but the drawback is that a timeout must be specified to
8
 
make it work (enforced by the class __init__).
9
 
 
10
 
this class could be enhanced with a read_until() method and more
11
 
like found in the telnetlib.
12
 
"""
13
 
 
14
 
from serial import Serial
15
 
 
16
 
class EnhancedSerial(Serial):
17
 
    def __init__(self, *args, **kwargs):
18
 
        #ensure that a reasonable timeout is set
19
 
        timeout = kwargs.get('timeout',0.1)
20
 
        if timeout < 0.01: timeout = 0.1
21
 
        kwargs['timeout'] = timeout
22
 
        Serial.__init__(self, *args, **kwargs)
23
 
        self.buf = ''
24
 
        
25
 
    def readline(self, maxsize=None, timeout=1):
26
 
        """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
27
 
        tries = 0
28
 
        while 1:
29
 
            self.buf += self.read(512)
30
 
            pos = self.buf.find('\n')
31
 
            if pos >= 0:
32
 
                line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
33
 
                return line
34
 
            tries += 1
35
 
            if tries * self.timeout > timeout:
36
 
                break
37
 
        line, self.buf = self.buf, ''
38
 
        return line
39
 
 
40
 
    def readlines(self, sizehint=None, timeout=1):
41
 
        """read all lines that are available. abort after timout
42
 
        when no more data arrives."""
43
 
        lines = []
44
 
        while 1:
45
 
            line = self.readline(timeout=timeout)
46
 
            if line:
47
 
                lines.append(line)
48
 
            if not line or line[-1:] != '\n':
49
 
                break
50
 
        return lines
51
 
 
52
 
if __name__=='__main__':
53
 
    #do some simple tests with a Loopback HW (see test.py for details)
54
 
    PORT = 0
55
 
    #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
56
 
    s = EnhancedSerial(PORT)
57
 
    #write out some test data lines
58
 
    s.write('\n'.join("hello how are you".split()))
59
 
    #and read them back
60
 
    print s.readlines()
61
 
    #this one should print an empty list
62
 
    print s.readlines(timeout=0.4)